PandasMiddleExperience

Расскажите о случае, когда вы использовали Pandas для pipeline, evaluation, optimization, deployment или debugging.

Использовал Pandas для ETL pipeline обработки данных: загрузка из CSV/SQL, очистка через vectorized операции, группировка и аггрегация, оптимизация памяти через category dtype и chunked reading для больших файлов.

Контекст задачи

В проекте аналитики e-commerce нужно было построить ETL pipeline: загружать данные о заказах из PostgreSQL и CSV-выгрузок (партнёрские продажи), объединять, очищать и считать метрики для дашборда. Объём данных — 50M строк в день, что потребовало оптимизации работы с памятью.

Pipeline загрузки и очистки

import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from typing import Iterator

def load_orders_chunked(
    db_url: str,
    chunk_size: int = 100_000,
) -> Iterator[pd.DataFrame]:
    """Load orders from PostgreSQL in chunks to avoid OOM."""
    engine = create_engine(db_url)
    query = """
        SELECT order_id, user_id, amount, currency, status, created_at
        FROM orders
        WHERE created_at >= NOW() - INTERVAL '30 days'
    """
    return pd.read_sql(query, engine, chunksize=chunk_size)


def clean_chunk(df: pd.DataFrame) -> pd.DataFrame:
    """Vectorized cleaning — no apply() loops."""
    # Удаляем дубликаты
    df = df.drop_duplicates(subset='order_id', keep='last')

    # Конвертируем типы
    df['created_at'] = pd.to_datetime(df['created_at'], utc=True)
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')

    # Удаляем строки с нулевой суммой или unknown статусом
    df = df[
        (df['amount'] > 0) &
        (df['status'].isin(['completed', 'refunded', 'pending']))
    ]

    # Оптимизация памяти через category dtype
    df['status'] = df['status'].astype('category')
    df['currency'] = df['currency'].astype('category')

    # Конвертация суммы в USD
    exchange_rates = {'USD': 1.0, 'EUR': 1.08, 'GBP': 1.27, 'RUB': 0.011}
    df['amount_usd'] = df['amount'] * df['currency'].map(exchange_rates)

    return df


def compute_daily_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """Aggregate metrics by day and status."""
    df['date'] = df['created_at'].dt.date

    metrics = (
        df.groupby(['date', 'status'], observed=True)
        .agg(
            orders_count=('order_id', 'count'),
            total_usd=('amount_usd', 'sum'),
            avg_usd=('amount_usd', 'mean'),
            unique_users=('user_id', 'nunique'),
        )
        .reset_index()
    )
    return metrics

Объединение с CSV-партнёрами

def merge_with_partners(db_df: pd.DataFrame, csv_path: str) -> pd.DataFrame:
    partner_df = pd.read_csv(
        csv_path,
        usecols=['order_id', 'partner_name', 'commission_rate'],
        dtype={'order_id': str, 'commission_rate': float},
    )

    merged = db_df.merge(partner_df, on='order_id', how='left')
    merged['commission_usd'] = (
        merged['amount_usd'] * merged['commission_rate'].fillna(0)
    )
    return merged

Запуск pipeline с накоплением

def run_pipeline(db_url: str, csv_path: str) -> pd.DataFrame:
    all_metrics: list[pd.DataFrame] = []

    for chunk in load_orders_chunked(db_url):
        clean = clean_chunk(chunk)
        merged = merge_with_partners(clean, csv_path)
        metrics = compute_daily_metrics(merged)
        all_metrics.append(metrics)

    result = pd.concat(all_metrics, ignore_index=True)
    # Финальная агрегация по всем чанкам
    return (
        result.groupby(['date', 'status'], observed=True)
        .agg({'orders_count': 'sum', 'total_usd': 'sum', 'avg_usd': 'mean', 'unique_users': 'sum'})
        .reset_index()
    )

Подводные камни

  • apply() с lambda — антипаттерн: df.apply(lambda x: ...) работает в Python-цикле и в 10-100 раз медленнее vectorized операций. Используйте np.where, .map(), .str.*.
  • Chained indexing: df[df.a > 0]['b'] = 1 не изменяет исходный DataFrame (SettingWithCopyWarning). Используйте df.loc[df.a > 0, 'b'] = 1.
  • groupby с category без observed=True: по умолчанию groupby создаёт группы для всех категорий, включая отсутствующие. В Pandas 2.0+ это предупреждение, в 3.0 будет ошибкой.
  • pd.concat в цикле: накопление через df = pd.concat([df, new]) на каждой итерации — O(n²) по памяти. Накапливайте в list, concat один раз.
  • read_csv без dtype: Pandas угадывает типы сканируя первые строки. Для больших файлов и числовых ID (64-значные числа) это приводит к float64 и потере точности. Задавайте dtype явно.
  • merge создаёт дубликаты: left merge на не уникальных ключах перемножает строки. Проверяйте уникальность перед merge: assert partner_df['order_id'].is_unique.
  • Timezone-naive смешивание: смешивание timezone-aware и naive datetime вызывает TypeError. Приводите всё к UTC через pd.to_datetime(df['col'], utc=True).
  • Chunked nunique неточен: unique_users посчитанный в каждом чанке и суммированный — не равен реальному nunique по всему датасету. Для точного подсчёта нужен HyperLogLog или накопление уникальных ID.

What hurts your answer

  • Выдумывать опыт или говорить слишком общими фразами
  • Не объяснять свою личную роль в работе с Pandas
  • Не показывать результат, метрики или извлечённые уроки

What they're listening for

  • Может подготовить честный пример использования Pandas
  • Показывает свою роль, решения и результат
  • Умеет рефлексировать над trade-offs и уроками

Related topics