TokioMiddleTechnical
Что такое tokio::sync::Semaphore и для чего он используется?
tokio::sync::Semaphore ограничивает число одновременных операций через permits: задача вызывает acquire().await, получает permit, выполняет работу, permit дропается — место освобождается. Применяют для rate limiting, connection pool, ограничения параллельных запросов.
Что такое tokio::sync::Semaphore
tokio::sync::Semaphore — примитив ограничения параллелизма. Он хранит счётчик permits (разрешений); задача должна получить permit (acquire().await) перед входом в защищённую секцию и вернуть его по завершении. Если permits закончились, задача приостанавливается до освобождения.
Базовый пример: ограничение параллельных HTTP-запросов
use std::sync::Arc;
use tokio::sync::Semaphore;
const MAX_CONCURRENT: usize = 10;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT));
let urls = vec!["https://example.com"; 50];
let mut handles = vec![];
for url in urls {
let sem = Arc::clone(&semaphore);
let handle = tokio::spawn(async move {
// acquire_owned возвращает OwnedSemaphorePermit
// permit дропается при выходе из блока => автоматический release
let _permit = sem.acquire().await.unwrap();
fetch(url).await
});
handles.push(handle);
}
for h in handles {
h.await.unwrap();
}
}
async fn fetch(url: &str) -> String {
// reqwest::get(url).await...
format!("fetched {}", url)
}
acquire_owned для передачи permit через spawn
use std::sync::Arc;
use tokio::sync::Semaphore;
async fn run_limited(sem: Arc<Semaphore>, id: u32) {
// OwnedSemaphorePermit: Send + 'static — можно передавать в spawn
let permit = sem.acquire_owned().await.unwrap();
tokio::spawn(async move {
println!("task {} running", id);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
drop(permit); // явно освобождаем
println!("task {} done", id);
});
}
Паттерн: connection pool
use tokio::sync::{Semaphore, OwnedSemaphorePermit};
use std::sync::Arc;
struct Pool {
semaphore: Arc<Semaphore>,
// реальные соединения в Vec<Connection>...
}
impl Pool {
fn new(size: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(size)),
}
}
async fn acquire(&self) -> OwnedSemaphorePermit {
Arc::clone(&self.semaphore)
.acquire_owned()
.await
.unwrap()
}
}
Закрытие семафора
semaphore.close() переводит семафор в закрытое состояние: все ожидающие acquire получат AcquireError. Используется для graceful shutdown.
let sem = Arc::new(Semaphore::new(5));
// ... в другой задаче:
sem.close(); // все задачи, ждущие acquire, получат ошибку
Подводные камни
- Permit нужно явно держать (
let _permit = ...) — если сразу дропнуть, секция не защищена; компилятор не предупредит Semaphore::new(0)создаёт семафор без permits — все задачи будут ждать вечно, пока не будет вызванadd_permits()acquire().await.unwrap()паникует если семафор закрыт — в production обрабатывайтеAcquireError- При использовании
acquire(неacquire_owned) guard не реализуетSend— нельзя держать через некоторые.awaitвmulti_thread Semaphore::MAX_PERMITSограничен (2^29 - 1 ≈ 536 млн на 64-bit) — теоретический предел, но не практический- Семафор не fair по умолчанию в смысле FIFO между задачами с разным приоритетом — Tokio не поддерживает приоритеты задач
- Избыточно малый лимит permits при высоком RPS может стать узким местом — мониторьте длину очереди ожидающих через
sem.available_permits() - Не используйте семафор для mutual exclusion одного ресурса — для этого правильнее
Mutex; семафор для ограничения N параллельных операций
Common mistakes
- Отвечать определением без production-сценария.
- Не называть runtime boundary, security boundary или failure mode.
- Игнорировать версию API, observability и тестовую проверку.
What the interviewer is testing
- Объясняет механизм своими словами и без выдуманных API.
- Называет реальные риски, диагностику и критерий корректности.
- Связывает ответ с текущей документацией и миграционными ограничениями.