PyTorchSeniorSystem design

Чем DataParallel отличается от DistributedDataParallel (DDP)?

DataParallel работает в одном процессе и узкое место — rank 0 (gather/scatter); DistributedDataParallel запускает отдельный процесс на GPU и синхронизирует градиенты через NCCL AllReduce, что даёт линейный скейлинг на сотни GPU.

DataParallel vs DistributedDataParallel

DataParallel (DP) и DistributedDataParallel (DDP) — два способа распределить обучение по нескольким GPU. Они существенно различаются архитектурой, производительностью и ограничениями.

DataParallel — как работает

DP живёт в одном процессе: главный GPU (rank 0) получает весь батч, делит его на sub-батчи, рассылает на остальные GPU через scatter, собирает градиенты обратно через gather и обновляет веса. Это удобно, но создаёт узкое место на rank 0.

import torch
import torch.nn as nn

model = nn.Linear(1024, 512)
# Оборачиваем — только 1 строка изменений
model = nn.DataParallel(model, device_ids=[0, 1, 2, 3])
model = model.cuda()

x = torch.randn(128, 1024).cuda()  # DataParallel сам нарежет по 32 на GPU
out = model(x)  # out собирается на cuda:0
out.mean().backward()

DistributedDataParallel — как работает

DDP запускает отдельный процесс на каждый GPU. Каждый процесс держит полную копию модели и обрабатывает свой sub-батч независимо. После backward синхронизирует градиенты через AllReduce (NCCL backend). Нет главного процесса-узкого места.

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DistributedSampler
import os

def train(rank: int, world_size: int):
    # Инициализация группы процессов
    dist.init_process_group(
        backend='nccl',          # NCCL для GPU-GPU, 'gloo' для CPU
        init_method='env://',    # MASTER_ADDR, MASTER_PORT из env
        world_size=world_size,
        rank=rank,
    )
    torch.cuda.set_device(rank)

    model = nn.Linear(1024, 512).cuda(rank)
    # find_unused_parameters=True только если часть параметров не участвует в forward
    ddp_model = DDP(model, device_ids=[rank], find_unused_parameters=False)

    # DistributedSampler гарантирует непересекающиеся sub-батчи
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
    loader = DataLoader(dataset, batch_size=32, sampler=sampler, num_workers=4)

    optimizer = torch.optim.AdamW(ddp_model.parameters(), lr=1e-3)

    for epoch in range(10):
        sampler.set_epoch(epoch)  # ОБЯЗАТЕЛЬНО для корректного перемешивания
        for x, y in loader:
            x, y = x.cuda(rank), y.cuda(rank)
            loss = criterion(ddp_model(x), y)
            optimizer.zero_grad()
            loss.backward()       # AllReduce градиентов происходит здесь
            optimizer.step()

    dist.destroy_process_group()

# Запуск: torchrun --nproc_per_node=4 train.py
# или: torch.multiprocessing.spawn(train, args=(4,), nprocs=4)

Сравнение ключевых характеристик

  • Процессы: DP — 1 процесс, DDP — N процессов (по одному на GPU).
  • GIL: DP страдает от GIL Python при сборке градиентов; DDP обходит GIL, каждый процесс независим.
  • Масштабирование: DP эффективен до ~2-4 GPU на одной машине; DDP масштабируется на сотни GPU и несколько машин.
  • Memory overhead: оба держат полную копию модели на каждом GPU; для очень больших моделей нужен FSDP (Fully Sharded DDP).
  • Mixed precision: DDP нативно работает с torch.cuda.amp.GradScaler; DP тоже, но синхронизация scaler сложнее.
  • Скорость: DDP обычно на 20-40% быстрее DP при 4 GPU из-за отсутствия gather/scatter через rank 0.

Запуск DDP через torchrun

# 4 GPU на одной машине
torchrun --standalone --nproc_per_node=4 train.py

# Мульти-нодовый запуск (2 машины по 4 GPU)
# На node 0:
torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 \
    --master_addr=192.168.1.1 --master_port=29500 train.py
# На node 1:
torchrun --nnodes=2 --nproc_per_node=4 --node_rank=1 \
    --master_addr=192.168.1.1 --master_port=29500 train.py

Подводные камни

  • Забыть sampler.set_epoch(epoch): без этого DistributedSampler использует одинаковый порядок во всех эпохах, нарушая перемешивание.
  • Сохранение checkpoint в каждом rank: записывать state_dict нужно только на rank 0 (if dist.get_rank() == 0), иначе все процессы одновременно пишут в файл.
  • find_unused_parameters=True без нужды: этот флаг добавляет overhead на каждую итерацию; включать только если модель имеет условные ветви, где часть параметров не используется.
  • DP с batch_size не кратным числу GPU: если batch_size=65 и 4 GPU, sub-батчи будут [17, 17, 17, 14] — BatchNorm получит разные размеры и потенциально нестабильную статистику.
  • Доступ к параметрам DDP-модели: параметры находятся в ddp_model.module, не в ddp_model; ddp_model.weight выбросит AttributeError.
  • NCCL deadlock при исключении: если один процесс падает с исключением, остальные ждут AllReduce бесконечно; оборачивайте train loop в try/finally с dist.destroy_process_group().
  • Gradient accumulation с DDP: при накоплении градиентов нужно отключать синхронизацию на промежуточных шагах: with ddp_model.no_sync(): loss.backward(), и только на финальном шаге — обычный backward.

Common mistakes

  • Объяснять dataparallel vs ddp только синтаксисом без shape, dtype, состояния или режима выполнения.
  • Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
  • Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.

What the interviewer is testing

  • Может ли связать dataparallel vs ddp с реальным контрактом входов и выходов.
  • Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
  • Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.
  • Предлагает ли observability, rollback, ограничения стоимости и стратегию incident replay.

Sources

Related topics