NumPyMiddleExperience

Расскажите о случае, когда вы использовали NumPy для pipeline, evaluation, optimization, deployment или debugging.

Использовал NumPy для построения inference pipeline: векторизованный preprocessing, батчевая нормализация и ONNX Runtime-инференс с numpy I/O — вместо pandas в критическом пути удалось снизить latency с 45мс до 8мс.

Кейс: NumPy в production inference pipeline

Задача: снизить latency REST API для рекомендательной модели. Исходный код использовал pandas для preprocessing на каждом запросе.

Профилирование узкого места

import cProfile
import pstats
import io
import numpy as np
import pandas as pd
import time

# Исходный код с pandas
def preprocess_pandas(raw: dict) -> np.ndarray:
    df = pd.DataFrame([raw])  # создание DataFrame на каждый запрос
    df['age'] = df['age'].fillna(df['age'].median())
    df['score'] = (df['score'] - SCORE_MEAN) / SCORE_STD
    return df[FEATURE_COLS].values.astype(np.float32)

# Профилируем
pr = cProfile.Profile()
pr.enable()
for _ in range(1000):
    preprocess_pandas({'age': 25, 'score': 0.8, 'category': 3})
pr.disable()

s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10)
print(s.getvalue())
# Результат: 45мс/запрос, 70% времени — DataFrame.__init__

Переписываем на NumPy

import numpy as np

# Предвычисленные константы (из обучающего набора)
SCORE_MEAN = np.float32(0.512)
SCORE_STD  = np.float32(0.234)
AGE_MEDIAN = np.float32(32.0)

# Предаллоцированный буфер для single-request inference
_buffer = np.empty((1, 3), dtype=np.float32)

def preprocess_numpy(raw: dict) -> np.ndarray:
    age      = raw.get('age') or AGE_MEDIAN  # fast NaN handling
    score    = raw['score']
    category = raw['category']

    _buffer[0, 0] = np.float32(age)
    _buffer[0, 1] = (np.float32(score) - SCORE_MEAN) / SCORE_STD
    _buffer[0, 2] = np.float32(category)

    return _buffer

# Тест производительности
start = time.perf_counter()
for _ in range(1000):
    preprocess_numpy({'age': 25, 'score': 0.8, 'category': 3})
print(f"NumPy: {(time.perf_counter() - start) * 1000:.1f}мс / 1000 запросов")
# Результат: 2.1мс / 1000 запросов

Батчевая обработка для GPU inference

import onnxruntime as ort
import numpy as np

# Загрузка ONNX-модели
sess = ort.InferenceSession('model.onnx', providers=['CUDAExecutionProvider'])
input_name  = sess.get_inputs()[0].name
output_name = sess.get_outputs()[0].name

def batch_predict(requests: list[dict]) -> np.ndarray:
    batch_size = len(requests)

    # Векторизованный preprocessing всего батча
    ages       = np.array([r.get('age') or AGE_MEDIAN for r in requests], dtype=np.float32)
    scores     = np.array([r['score'] for r in requests], dtype=np.float32)
    categories = np.array([r['category'] for r in requests], dtype=np.float32)

    # Нормализация без копирования (in-place)
    ages      /= 100.0                       # нормализация возраста
    scores     = (scores - SCORE_MEAN) / SCORE_STD

    X = np.stack([ages, scores, categories], axis=1)  # (batch_size, 3)

    # ONNX Runtime inference
    predictions = sess.run([output_name], {input_name: X})[0]  # numpy out
    return predictions

Мониторинг качества (evaluation)

def evaluate_batch(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
    """Метрики без sklearn для минимальных зависимостей."""
    mae  = np.mean(np.abs(y_true - y_pred))
    rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))
    # AUC approximation через sorting
    order = np.argsort(-y_pred)
    tpr_fpr = np.cumsum(y_true[order]) / y_true.sum()
    auc  = np.trapz(tpr_fpr) / len(tpr_fpr)
    return {'mae': float(mae), 'rmse': float(rmse), 'auc': float(auc)}

Итог

Замена pandas на NumPy в критическом пути снизила latency p99 с 45мс до 8мс. Ключевые техники: предаллокация буфера, in-place операции, float32 вместо float64, батчевый preprocessing.

Подводные камни

  • Предаллоцированный буфер в multi-threaded сервере (несколько воркеров) — race condition: два запроса пишут в один буфер одновременно. Используйте thread-local storage или создавайте новый массив.
  • ONNX Runtime по умолчанию использует CPU даже если указан CUDAExecutionProvider — он fallback'ает молча; проверяйте sess.get_providers().
  • in-place операции (/=, -=) на view массива изменяют оригинальные данные — при многократном вызове функции данные накапливают ошибки.
  • np.stack создаёт копию данных — для очень частых вызовов (>10K RPS) прямая запись в pre-allocated 2D буфер эффективнее.
  • Несоответствие dtype между preprocessing (float32) и ожидаемым моделью (float64) приводит к неявному копированию внутри ONNX Runtime.
  • np.trapz для AUC работает только при правильной сортировке — без np.argsort(-y_pred) результат неверен.

What hurts your answer

  • Выдумывать опыт или говорить слишком общими фразами
  • Не объяснять свою личную роль в работе с NumPy
  • Не показывать результат, метрики или извлечённые уроки

What they're listening for

  • Может подготовить честный пример использования NumPy
  • Показывает свою роль, решения и результат
  • Умеет рефлексировать над trade-offs и уроками

Related topics