Как работают RunnablePassthrough и RunnableParallel в LCEL?
RunnablePassthrough передаёт вход без изменений (или добавляет поля через .assign()), RunnableParallel выполняет несколько Runnable параллельно и собирает результаты в словарь.
RunnablePassthrough и RunnableParallel в LCEL
LCEL (LangChain Expression Language) строит цепочки через оператор |. Два специальных объекта — RunnablePassthrough и RunnableParallel — решают задачи маршрутизации и трансформации данных внутри цепочки.
RunnablePassthrough — прозрачная передача данных
Передаёт входные данные следующему шагу без изменений. Используется когда нужно сохранить оригинальный вход для дальнейшего использования в цепочке.
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Базовое использование — просто передаёт строку дальше
passthrough = RunnablePassthrough()
result = passthrough.invoke("hello") # -> "hello"
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
("system", "Answer the question using the context.\nContext: {context}"),
("human", "{question}"),
])
# RAG-паттерн: question передаётся через passthrough,
# одновременно идёт в retriever для поиска
rag_chain = (
{
"context": retriever, # retriever получает вопрос
"question": RunnablePassthrough() # вопрос передаётся дальше как есть
}
| prompt
| llm
| StrOutputParser()
)
result = rag_chain.invoke("What is vector search?")
RunnablePassthrough.assign() — добавление полей
.assign() добавляет новые ключи в словарь, не удаляя существующие. Это основной способ обогатить контекст по ходу цепочки.
from langchain_core.runnables import RunnablePassthrough
# Исходный словарь: {"question": "What is Python?"}
# После assign добавляется поле "context"
chain = (
RunnablePassthrough.assign(
context=lambda x: retriever.invoke(x["question"]),
timestamp=lambda x: "2026-05-20",
)
| prompt # получает {"question": ..., "context": ..., "timestamp": ...}
| llm
| StrOutputParser()
)
result = chain.invoke({"question": "Explain LangChain"})
# prompt получил все три поля
RunnableParallel — параллельное выполнение
Запускает несколько Runnable одновременно (в потоках) и собирает результаты в словарь. Ключи словаря становятся именами результатов.
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o")
# Два промпта выполняются параллельно
positive_prompt = ChatPromptTemplate.from_messages([
("human", "List 3 advantages of {technology}")
])
negative_prompt = ChatPromptTemplate.from_messages([
("human", "List 3 disadvantages of {technology}")
])
parallel_analysis = RunnableParallel(
pros=(positive_prompt | llm | StrOutputParser()),
cons=(negative_prompt | llm | StrOutputParser()),
)
result = parallel_analysis.invoke({"technology": "Python"})
print(result["pros"]) # Список преимуществ
print(result["cons"]) # Список недостатков
# Оба LLM-вызова выполнились параллельно!
Краткий синтаксис — словарный литерал
В LCEL словарь автоматически преобразуется в RunnableParallel:
# Эти записи эквивалентны:
# Явный RunnableParallel
chain1 = RunnableParallel({"a": step_a, "b": step_b}) | final_step
# Сокращённый синтаксис через словарь
chain2 = {"a": step_a, "b": step_b} | final_step
# Комбинация всех инструментов
full_rag = (
{"context": retriever, "question": RunnablePassthrough()}
| RunnablePassthrough.assign(char_count=lambda x: len(x["question"]))
| prompt
| llm
| StrOutputParser()
)
Реальный пример: multi-query retrieval
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# Генерируем 3 варианта запроса параллельно
rephrase_prompt = ChatPromptTemplate.from_messages([
("human", "Generate {n} different versions of: {question}")
])
multi_query_chain = (
RunnableParallel(
original=RunnablePassthrough(),
v1=({"question": RunnablePassthrough(), "n": lambda _: 1} | rephrase_prompt | llm | StrOutputParser()),
v2=({"question": RunnablePassthrough(), "n": lambda _: 1} | rephrase_prompt | llm | StrOutputParser()),
)
| (lambda x: [x["original"], x["v1"], x["v2"]]) # Объединяем
)
Подводные камни
- RunnableParallel выполняется в потоках, не в asyncio — при sync вызове используются ThreadPoolExecutor; не подходит для CPU-heavy задач.
- assign() мутирует входной словарь по ссылке — если в lambda изменить x, это влияет на другие ветки; создавайте новые объекты в lambda.
- Порядок ключей в RunnableParallel не гарантирован — обращайтесь к результатам по именам ключей, не по индексу.
- RunnablePassthrough без аргументов копирует весь вход — если вход — большой словарь с эмбеддингами, это лишняя копия памяти; передавайте только нужные поля через lambda.
- Исключение в одной ветке RunnableParallel отменяет все — нет встроенной обработки частичных отказов; оборачивайте ветки в RunnableLambda с try/except.
- Nested RunnableParallel теряет stream() — стриминг работает только по линейной цепочке; при параллельном выполнении стриминг буферизуется до завершения всех веток.
Common mistakes
- Объяснять
runnable passthrough parallelтолько синтаксисом без shape, dtype, состояния или режима выполнения. - Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
- Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.
What the interviewer is testing
- Может ли связать
runnable passthrough parallelс реальным контрактом входов и выходов. - Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
- Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.