TokioMiddleTechnical

Что такое трейты AsyncRead и AsyncWrite в Tokio?

AsyncRead и AsyncWrite — низкоуровневые трейты с методами poll_read/poll_write, возвращающими Poll; расширения AsyncReadExt/AsyncWriteExt добавляют удобные async-методы (read, write_all, copy) поверх них.

Трейты AsyncRead и AsyncWrite

AsyncRead и AsyncWrite — это фундаментальные трейты Tokio (из крейта tokio::io), которые определяют контракт асинхронного чтения и записи. Они являются async-аналогами std::io::Read и std::io::Write, но вместо блокировки потока возвращают Poll и регистрируют Waker для пробуждения задачи.

Сигнатуры трейтов

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::ReadBuf;

// Упрощённое определение
pub trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>>;
}

pub trait AsyncWrite {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>>;

    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<std::io::Result<()>>;

    fn poll_shutdown(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<std::io::Result<()>>;
}

ReadBuf отличается от простого &mut [u8]: он отслеживает, сколько байт реально заполнено, позволяя избежать инициализации буфера нулями (unsafe оптимизация). poll_shutdown завершает запись — например, отправляет TLS close_notify или закрывает write-половину TCP-соединения.

Расширения: AsyncReadExt и AsyncWriteExt

Трейты AsyncReadExt и AsyncWriteExt (из tokio::io) добавляют удобные методы поверх низкоуровневых poll-методов. Они реализованы через автоматическое blanket-implementation для всех типов, реализующих AsyncRead/AsyncWrite.

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

async fn echo(mut stream: TcpStream) -> anyhow::Result<()> {
    let mut buf = vec![0u8; 4096];
    loop {
        let n = stream.read(&mut buf).await?;  // AsyncReadExt::read
        if n == 0 { break; }                    // EOF
        stream.write_all(&buf[..n]).await?;     // AsyncWriteExt::write_all
    }
    stream.shutdown().await?;                   // AsyncWriteExt::shutdown
    Ok(())
}

Реализация собственного AsyncRead

Пример in-memory читалки, реализующей AsyncRead вручную:

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};

struct Cursor {
    data: Vec<u8>,
    pos: usize,
}

impl AsyncRead for Cursor {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let remaining = &self.data[self.pos..];
        let to_copy = remaining.len().min(buf.remaining());
        buf.put_slice(&remaining[..to_copy]);
        self.pos += to_copy;
        Poll::Ready(Ok(()))
    }
}

Композиция: tokio::io::copy

use tokio::io;
use tokio::net::TcpStream;

async fn proxy(client: TcpStream, server: TcpStream) -> anyhow::Result<()> {
    let (mut cr, mut cw) = io::split(client);
    let (mut sr, mut sw) = io::split(server);

    tokio::try_join!(
        io::copy(&mut cr, &mut sw),
        io::copy(&mut sr, &mut cw),
    )?;
    Ok(())
}

Подводные камни

  • Требование Pin: методы принимают Pin<&mut Self>, поэтому типы с полями, требующими перемещения, нужно оборачивать в Box::pin или помечать #[pin_project] (крейт pin-project).
  • poll_read возвращает Poll::Ready(Ok(())) с нулём записанных байт для сигнала EOF — не Poll::Ready(Err(...)). Проверяйте buf.filled().len(), а не код ошибки.
  • Метод read из AsyncReadExt может прочитать меньше байт, чем буфер — для точного чтения N байт используйте read_exact.
  • Если не вызвать cx.waker().wake_by_ref() при возврате Poll::Pending, задача никогда не будет разбужена — дедлок.
  • poll_shutdown не закрывает read-половину — для полного закрытия TCP нужен явный drop или socket.shutdown(std::net::Shutdown::Both).
  • Смешивание tokio::io::AsyncRead и futures::io::AsyncRead вызовет ошибки компиляции — это разные трейты. Используйте адаптеры tokio_util::compat.
  • Реализация AsyncWrite без корректного poll_flush может привести к потере данных в буферизованных адаптерах.

Common mistakes

  • Отвечать определением без production-сценария.
  • Не называть runtime boundary, security boundary или failure mode.
  • Игнорировать версию API, observability и тестовую проверку.

What the interviewer is testing

  • Объясняет механизм своими словами и без выдуманных API.
  • Называет реальные риски, диагностику и критерий корректности.
  • Связывает ответ с текущей документацией и миграционными ограничениями.

Sources

Related topics