PandasMiddleExperience
Расскажите о случае, когда вы использовали Pandas для pipeline, evaluation, optimization, deployment или debugging.
Использовал Pandas для ETL pipeline обработки данных: загрузка из CSV/SQL, очистка через vectorized операции, группировка и аггрегация, оптимизация памяти через category dtype и chunked reading для больших файлов.
Контекст задачи
В проекте аналитики e-commerce нужно было построить ETL pipeline: загружать данные о заказах из PostgreSQL и CSV-выгрузок (партнёрские продажи), объединять, очищать и считать метрики для дашборда. Объём данных — 50M строк в день, что потребовало оптимизации работы с памятью.
Pipeline загрузки и очистки
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from typing import Iterator
def load_orders_chunked(
db_url: str,
chunk_size: int = 100_000,
) -> Iterator[pd.DataFrame]:
"""Load orders from PostgreSQL in chunks to avoid OOM."""
engine = create_engine(db_url)
query = """
SELECT order_id, user_id, amount, currency, status, created_at
FROM orders
WHERE created_at >= NOW() - INTERVAL '30 days'
"""
return pd.read_sql(query, engine, chunksize=chunk_size)
def clean_chunk(df: pd.DataFrame) -> pd.DataFrame:
"""Vectorized cleaning — no apply() loops."""
# Удаляем дубликаты
df = df.drop_duplicates(subset='order_id', keep='last')
# Конвертируем типы
df['created_at'] = pd.to_datetime(df['created_at'], utc=True)
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Удаляем строки с нулевой суммой или unknown статусом
df = df[
(df['amount'] > 0) &
(df['status'].isin(['completed', 'refunded', 'pending']))
]
# Оптимизация памяти через category dtype
df['status'] = df['status'].astype('category')
df['currency'] = df['currency'].astype('category')
# Конвертация суммы в USD
exchange_rates = {'USD': 1.0, 'EUR': 1.08, 'GBP': 1.27, 'RUB': 0.011}
df['amount_usd'] = df['amount'] * df['currency'].map(exchange_rates)
return df
def compute_daily_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate metrics by day and status."""
df['date'] = df['created_at'].dt.date
metrics = (
df.groupby(['date', 'status'], observed=True)
.agg(
orders_count=('order_id', 'count'),
total_usd=('amount_usd', 'sum'),
avg_usd=('amount_usd', 'mean'),
unique_users=('user_id', 'nunique'),
)
.reset_index()
)
return metrics
Объединение с CSV-партнёрами
def merge_with_partners(db_df: pd.DataFrame, csv_path: str) -> pd.DataFrame:
partner_df = pd.read_csv(
csv_path,
usecols=['order_id', 'partner_name', 'commission_rate'],
dtype={'order_id': str, 'commission_rate': float},
)
merged = db_df.merge(partner_df, on='order_id', how='left')
merged['commission_usd'] = (
merged['amount_usd'] * merged['commission_rate'].fillna(0)
)
return merged
Запуск pipeline с накоплением
def run_pipeline(db_url: str, csv_path: str) -> pd.DataFrame:
all_metrics: list[pd.DataFrame] = []
for chunk in load_orders_chunked(db_url):
clean = clean_chunk(chunk)
merged = merge_with_partners(clean, csv_path)
metrics = compute_daily_metrics(merged)
all_metrics.append(metrics)
result = pd.concat(all_metrics, ignore_index=True)
# Финальная агрегация по всем чанкам
return (
result.groupby(['date', 'status'], observed=True)
.agg({'orders_count': 'sum', 'total_usd': 'sum', 'avg_usd': 'mean', 'unique_users': 'sum'})
.reset_index()
)
Подводные камни
- apply() с lambda — антипаттерн:
df.apply(lambda x: ...)работает в Python-цикле и в 10-100 раз медленнее vectorized операций. Используйтеnp.where,.map(),.str.*. - Chained indexing:
df[df.a > 0]['b'] = 1не изменяет исходный DataFrame (SettingWithCopyWarning). Используйтеdf.loc[df.a > 0, 'b'] = 1. - groupby с category без observed=True: по умолчанию groupby создаёт группы для всех категорий, включая отсутствующие. В Pandas 2.0+ это предупреждение, в 3.0 будет ошибкой.
- pd.concat в цикле: накопление через
df = pd.concat([df, new])на каждой итерации — O(n²) по памяти. Накапливайте в list, concat один раз. - read_csv без dtype: Pandas угадывает типы сканируя первые строки. Для больших файлов и числовых ID (64-значные числа) это приводит к float64 и потере точности. Задавайте
dtypeявно. - merge создаёт дубликаты: left merge на не уникальных ключах перемножает строки. Проверяйте уникальность перед merge:
assert partner_df['order_id'].is_unique. - Timezone-naive смешивание: смешивание timezone-aware и naive datetime вызывает TypeError. Приводите всё к UTC через
pd.to_datetime(df['col'], utc=True). - Chunked nunique неточен:
unique_usersпосчитанный в каждом чанке и суммированный — не равен реальному nunique по всему датасету. Для точного подсчёта нужен HyperLogLog или накопление уникальных ID.
What hurts your answer
- Выдумывать опыт или говорить слишком общими фразами
- Не объяснять свою личную роль в работе с Pandas
- Не показывать результат, метрики или извлечённые уроки
What they're listening for
- Может подготовить честный пример использования Pandas
- Показывает свою роль, решения и результат
- Умеет рефлексировать над trade-offs и уроками