LangChainMiddleCoding

Как работают RunnablePassthrough и RunnableParallel в LCEL?

RunnablePassthrough передаёт вход без изменений (или добавляет поля через .assign()), RunnableParallel выполняет несколько Runnable параллельно и собирает результаты в словарь.

RunnablePassthrough и RunnableParallel в LCEL

LCEL (LangChain Expression Language) строит цепочки через оператор |. Два специальных объекта — RunnablePassthrough и RunnableParallel — решают задачи маршрутизации и трансформации данных внутри цепочки.

RunnablePassthrough — прозрачная передача данных

Передаёт входные данные следующему шагу без изменений. Используется когда нужно сохранить оригинальный вход для дальнейшего использования в цепочке.

from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Базовое использование — просто передаёт строку дальше
passthrough = RunnablePassthrough()
result = passthrough.invoke("hello")  # -> "hello"

llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer the question using the context.\nContext: {context}"),
    ("human", "{question}"),
])

# RAG-паттерн: question передаётся через passthrough,
# одновременно идёт в retriever для поиска
rag_chain = (
    {
        "context": retriever,           # retriever получает вопрос
        "question": RunnablePassthrough()  # вопрос передаётся дальше как есть
    }
    | prompt
    | llm
    | StrOutputParser()
)

result = rag_chain.invoke("What is vector search?")

RunnablePassthrough.assign() — добавление полей

.assign() добавляет новые ключи в словарь, не удаляя существующие. Это основной способ обогатить контекст по ходу цепочки.

from langchain_core.runnables import RunnablePassthrough

# Исходный словарь: {"question": "What is Python?"}
# После assign добавляется поле "context"
chain = (
    RunnablePassthrough.assign(
        context=lambda x: retriever.invoke(x["question"]),
        timestamp=lambda x: "2026-05-20",
    )
    | prompt  # получает {"question": ..., "context": ..., "timestamp": ...}
    | llm
    | StrOutputParser()
)

result = chain.invoke({"question": "Explain LangChain"})
# prompt получил все три поля

RunnableParallel — параллельное выполнение

Запускает несколько Runnable одновременно (в потоках) и собирает результаты в словарь. Ключи словаря становятся именами результатов.

from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI(model="gpt-4o")

# Два промпта выполняются параллельно
positive_prompt = ChatPromptTemplate.from_messages([
    ("human", "List 3 advantages of {technology}")
])
negative_prompt = ChatPromptTemplate.from_messages([
    ("human", "List 3 disadvantages of {technology}")
])

parallel_analysis = RunnableParallel(
    pros=(positive_prompt | llm | StrOutputParser()),
    cons=(negative_prompt | llm | StrOutputParser()),
)

result = parallel_analysis.invoke({"technology": "Python"})
print(result["pros"])   # Список преимуществ
print(result["cons"])   # Список недостатков
# Оба LLM-вызова выполнились параллельно!

Краткий синтаксис — словарный литерал

В LCEL словарь автоматически преобразуется в RunnableParallel:

# Эти записи эквивалентны:

# Явный RunnableParallel
chain1 = RunnableParallel({"a": step_a, "b": step_b}) | final_step

# Сокращённый синтаксис через словарь
chain2 = {"a": step_a, "b": step_b} | final_step

# Комбинация всех инструментов
full_rag = (
    {"context": retriever, "question": RunnablePassthrough()}
    | RunnablePassthrough.assign(char_count=lambda x: len(x["question"]))
    | prompt
    | llm
    | StrOutputParser()
)

Реальный пример: multi-query retrieval

from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# Генерируем 3 варианта запроса параллельно
rephrase_prompt = ChatPromptTemplate.from_messages([
    ("human", "Generate {n} different versions of: {question}")
])

multi_query_chain = (
    RunnableParallel(
        original=RunnablePassthrough(),
        v1=({"question": RunnablePassthrough(), "n": lambda _: 1} | rephrase_prompt | llm | StrOutputParser()),
        v2=({"question": RunnablePassthrough(), "n": lambda _: 1} | rephrase_prompt | llm | StrOutputParser()),
    )
    | (lambda x: [x["original"], x["v1"], x["v2"]])  # Объединяем
)

Подводные камни

  • RunnableParallel выполняется в потоках, не в asyncio — при sync вызове используются ThreadPoolExecutor; не подходит для CPU-heavy задач.
  • assign() мутирует входной словарь по ссылке — если в lambda изменить x, это влияет на другие ветки; создавайте новые объекты в lambda.
  • Порядок ключей в RunnableParallel не гарантирован — обращайтесь к результатам по именам ключей, не по индексу.
  • RunnablePassthrough без аргументов копирует весь вход — если вход — большой словарь с эмбеддингами, это лишняя копия памяти; передавайте только нужные поля через lambda.
  • Исключение в одной ветке RunnableParallel отменяет все — нет встроенной обработки частичных отказов; оборачивайте ветки в RunnableLambda с try/except.
  • Nested RunnableParallel теряет stream() — стриминг работает только по линейной цепочке; при параллельном выполнении стриминг буферизуется до завершения всех веток.

Common mistakes

  • Объяснять runnable passthrough parallel только синтаксисом без shape, dtype, состояния или режима выполнения.
  • Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
  • Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.

What the interviewer is testing

  • Может ли связать runnable passthrough parallel с реальным контрактом входов и выходов.
  • Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
  • Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.

Sources

Related topics