TokioMiddleTechnical

Что такое tokio::sync::Semaphore и для чего он используется?

tokio::sync::Semaphore ограничивает число одновременных операций через permits: задача вызывает acquire().await, получает permit, выполняет работу, permit дропается — место освобождается. Применяют для rate limiting, connection pool, ограничения параллельных запросов.

Что такое tokio::sync::Semaphore

tokio::sync::Semaphore — примитив ограничения параллелизма. Он хранит счётчик permits (разрешений); задача должна получить permit (acquire().await) перед входом в защищённую секцию и вернуть его по завершении. Если permits закончились, задача приостанавливается до освобождения.

Базовый пример: ограничение параллельных HTTP-запросов

use std::sync::Arc;
use tokio::sync::Semaphore;

const MAX_CONCURRENT: usize = 10;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT));
    let urls = vec!["https://example.com"; 50];

    let mut handles = vec![];
    for url in urls {
        let sem = Arc::clone(&semaphore);
        let handle = tokio::spawn(async move {
            // acquire_owned возвращает OwnedSemaphorePermit
            // permit дропается при выходе из блока => автоматический release
            let _permit = sem.acquire().await.unwrap();
            fetch(url).await
        });
        handles.push(handle);
    }

    for h in handles {
        h.await.unwrap();
    }
}

async fn fetch(url: &str) -> String {
    // reqwest::get(url).await...
    format!("fetched {}", url)
}

acquire_owned для передачи permit через spawn

use std::sync::Arc;
use tokio::sync::Semaphore;

async fn run_limited(sem: Arc<Semaphore>, id: u32) {
    // OwnedSemaphorePermit: Send + 'static — можно передавать в spawn
    let permit = sem.acquire_owned().await.unwrap();

    tokio::spawn(async move {
        println!("task {} running", id);
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        drop(permit);  // явно освобождаем
        println!("task {} done", id);
    });
}

Паттерн: connection pool

use tokio::sync::{Semaphore, OwnedSemaphorePermit};
use std::sync::Arc;

struct Pool {
    semaphore: Arc<Semaphore>,
    // реальные соединения в Vec<Connection>...
}

impl Pool {
    fn new(size: usize) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(size)),
        }
    }

    async fn acquire(&self) -> OwnedSemaphorePermit {
        Arc::clone(&self.semaphore)
            .acquire_owned()
            .await
            .unwrap()
    }
}

Закрытие семафора

semaphore.close() переводит семафор в закрытое состояние: все ожидающие acquire получат AcquireError. Используется для graceful shutdown.

let sem = Arc::new(Semaphore::new(5));
// ... в другой задаче:
sem.close();  // все задачи, ждущие acquire, получат ошибку

Подводные камни

  • Permit нужно явно держать (let _permit = ...) — если сразу дропнуть, секция не защищена; компилятор не предупредит
  • Semaphore::new(0) создаёт семафор без permits — все задачи будут ждать вечно, пока не будет вызван add_permits()
  • acquire().await.unwrap() паникует если семафор закрыт — в production обрабатывайте AcquireError
  • При использовании acquire (не acquire_owned) guard не реализует Send — нельзя держать через некоторые .await в multi_thread
  • Semaphore::MAX_PERMITS ограничен (2^29 - 1 ≈ 536 млн на 64-bit) — теоретический предел, но не практический
  • Семафор не fair по умолчанию в смысле FIFO между задачами с разным приоритетом — Tokio не поддерживает приоритеты задач
  • Избыточно малый лимит permits при высоком RPS может стать узким местом — мониторьте длину очереди ожидающих через sem.available_permits()
  • Не используйте семафор для mutual exclusion одного ресурса — для этого правильнее Mutex; семафор для ограничения N параллельных операций

Common mistakes

  • Отвечать определением без production-сценария.
  • Не называть runtime boundary, security boundary или failure mode.
  • Игнорировать версию API, observability и тестовую проверку.

What the interviewer is testing

  • Объясняет механизм своими словами и без выдуманных API.
  • Называет реальные риски, диагностику и критерий корректности.
  • Связывает ответ с текущей документацией и миграционными ограничениями.

Sources

Related topics