FastAPIMiddleCoding

Как реализовать WebSocket-эндпоинты в FastAPI?

Декоратор @app.websocket("/ws") принимает WebSocket-объект. После await websocket.accept() можно читать (receive_text/receive_json) и отправлять (send_text/send_json) в цикле. Для broadcast используется менеджер подключений с dict или set активных WebSocket.

WebSocket в FastAPI

FastAPI поддерживает WebSocket нативно через ASGI. Маршрут объявляется декоратором @app.websocket(), параметр функции типизируется как WebSocket из fastapi.

Базовое эхо-соединение

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect as e:
        logger.info("Client disconnected, code=%s", e.code)

Менеджер подключений для broadcast

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Any

app = FastAPI()

class ConnectionManager:
    def __init__(self) -> None:
        self.active: dict[str, WebSocket] = {}

    async def connect(self, client_id: str, ws: WebSocket) -> None:
        await ws.accept()
        self.active[client_id] = ws

    def disconnect(self, client_id: str) -> None:
        self.active.pop(client_id, None)

    async def send_personal(self, message: str, client_id: str) -> None:
        ws = self.active.get(client_id)
        if ws:
            await ws.send_text(message)

    async def broadcast(self, message: str) -> None:
        dead: list[str] = []
        for cid, ws in self.active.items():
            try:
                await ws.send_text(message)
            except Exception:
                dead.append(cid)
        for cid in dead:
            self.disconnect(cid)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def chat_endpoint(websocket: WebSocket, client_id: str) -> None:
    await manager.connect(client_id, websocket)
    await manager.broadcast(f"[{client_id}] подключился")
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"[{client_id}]: {data}")
    except WebSocketDisconnect:
        manager.disconnect(client_id)
        await manager.broadcast(f"[{client_id}] отключился")

Аутентификация WebSocket через Query-параметр

from fastapi import Query, HTTPException

async def get_ws_user(token: str = Query(...)) -> str:
    # верифицируем JWT или session token
    if token != "valid-token":
        raise HTTPException(status_code=403, detail="Forbidden")
    return "user_id_from_token"

@app.websocket("/ws/secure")
async def secure_ws(
    websocket: WebSocket,
    user_id: str = Depends(get_ws_user),
) -> None:
    await websocket.accept()
    await websocket.send_json({"event": "connected", "user": user_id})
    ...

Получение и отправка JSON

@app.websocket("/ws/json")
async def json_ws(websocket: WebSocket) -> None:
    await websocket.accept()
    try:
        while True:
            payload: dict = await websocket.receive_json()
            event = payload.get("event")
            if event == "ping":
                await websocket.send_json({"event": "pong"})
            elif event == "subscribe":
                channel = payload.get("channel")
                await websocket.send_json({"event": "subscribed", "channel": channel})
    except WebSocketDisconnect:
        pass

Подводные камни

  • WebSocket-соединения долгоживущие — каждое держит файловый дескриптор и goroutine; без лимитов легко исчерпать ресурсы сервера.
  • ConnectionManager с обычным dict не работает в многопроцессорном окружении (несколько воркеров uvicorn) — используйте Redis Pub/Sub или Redis Streams для broadcast между процессами.
  • Если клиент резко закрывает соединение, receive_text() бросает WebSocketDisconnect, а не просто возвращает None — всегда оборачивайте цикл в try/except.
  • Аутентификация через cookies не работает в WebSocket так же как в HTTP — браузеры отправляют cookies при handshake, но стандартный Depends(get_current_user)` через заголовок Authorization не сработает в WSS без дополнительной логики.
  • nginx по умолчанию закрывает idle WebSocket-соединения через 60 секунд — настройте proxy_read_timeout и реализуйте ping/pong на клиенте.
  • TestClient не поддерживает WebSocket напрямую; для тестирования используйте with client.websocket_connect("/ws") as ws: — это синхронный API из starlette.
  • Broadcast в цикле блокирует event loop если один из клиентов медленный — используйте asyncio.gather или добавляйте таймаут на отправку.

Common mistakes

  • Описывать websockets только как термин и не показывать механизм на минимальном примере.
  • Игнорировать ошибки, пустые данные, конкурентный доступ или границы транзакции.
  • Не связывать поведение с официальным контрактом FastAPI и реальной эксплуатацией.

What the interviewer is testing

  • Объясняет websockets через последовательность действий, а не через набор ключевых слов.
  • Приводит короткий кодовый пример или production-сценарий с ожидаемым поведением.
  • Называет хотя бы один риск: производительность, безопасность, транзакции, память или сопровождение.

Sources

Related topics