Skip to main content

How to stream results from your RAG application

This guide explains how to stream results from a RAG application. It covers streaming tokens from the final output as well as intermediate steps of a chain (e.g., from query re-writing).

We'll work off of the Q&A app with sources we built over the LLM Powered Autonomous Agents blog post by Lilian Weng in the RAG tutorial.

Setup​

Dependencies​

We'll use the following packages:

%pip install --upgrade --quiet  langchain langchain-community langchainhub langchain-openai langchain-chroma beautifulsoup4

LangSmith​

Many of the applications you build with LangChain will contain multiple steps with multiple invocations of LLM calls. As these applications get more and more complex, it becomes crucial to be able to inspect what exactly is going on inside your chain or agent. The best way to do this is with LangSmith.

Note that LangSmith is not needed, but it is helpful. If you do want to use LangSmith, after you sign up at the link above, make sure to set your environment variables to start logging traces:

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = getpass.getpass()

Components​

We will need to select three components from LangChain's suite of integrations.

A chat model:

pip install -qU langchain-openai
import getpass
import os

os.environ["OPENAI_API_KEY"] = getpass.getpass()

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

An embedding model:

pip install -qU langchain-openai
import getpass

os.environ["OPENAI_API_KEY"] = getpass.getpass()

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

And a vector store:

pip install -qU langchain-core
from langchain_core.vector_stores import InMemoryVectorStore

vector_store = InMemoryVectorStore(embeddings)

RAG application​

Let's reconstruct the Q&A app with sources we built over the LLM Powered Autonomous Agents blog post by Lilian Weng in the RAG tutorial.

First we index our documents:

import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing_extensions import List, TypedDict

# Load and chunk contents of the blog
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)
# Index chunks
_ = vector_store.add_documents(documents=all_splits)

Next we build the application:

from langchain_core.messages import SystemMessage
from langchain_core.tools import tool
from langgraph.graph import END, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition


# Define retrieval tool
@tool(response_format="content_and_artifact")
def retrieve(query: str):
"""Retrieve information related to a query."""
retrieved_docs = vector_store.similarity_search(query, k=2)
serialized = "\n\n".join(
(f"Source: {doc.metadata}\n" f"Content: {doc.page_content}")
for doc in retrieved_docs
)
return serialized, retrieved_docs


# Step 1: Generate an AIMessage that may include a tool-call to be sent.
def query_or_respond(state: MessagesState):
"""Generate tool call for retrieval or respond."""
llm_with_tools = llm.bind_tools([retrieve])
response = llm_with_tools.invoke(state["messages"])
# MessagesState appends messages to state instead of overwriting
return {"messages": [response]}


# Step 2: Execute the retrieval.
tools = ToolNode([retrieve])


# Step 3: Generate a response using the retrieved content.
def generate(state: MessagesState):
"""Generate answer."""
last_message = state["messages"][-1] # will be ToolMessage
docs_content = last_message.content # Retrieved context
system_message_content = (
"You are an assistant for question-answering tasks. "
"Use the following pieces of retrieved context to answer "
"the question. If you don't know the answer, say that you "
"don't know. Use three sentences maximum and keep the "
"answer concise."
"\n\n"
f"{docs_content}"
)
conversation_messages = [
message
for message in state["messages"]
if message.type in ("human", "system")
or (message.type == "ai" and not message.tool_calls)
]
prompt = [SystemMessage(system_message_content)] + conversation_messages

# Run
response = llm.invoke(prompt)
return {"messages": [response]}


# Build graph
graph_builder = StateGraph(MessagesState)

graph_builder.add_node(query_or_respond)
graph_builder.add_node(tools)
graph_builder.add_node(generate)

graph_builder.set_entry_point("query_or_respond")
graph_builder.add_conditional_edges(
"query_or_respond",
tools_condition,
{END: END, "tools": "tools"},
)
graph_builder.add_edge("tools", "generate")
graph_builder.add_edge("generate", END)
graph = graph_builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Streaming final outputs​

LangGraph supports several streaming modes, which can be controlled by specifying the stream_mode parameter. Setting stream_mode="messages" allows us to stream tokens from chat model invocations.

Note that there are multiple chat model invocations in this application-- the first step that generates a query, and the last step that generates a response. Below, we filter to only the last step using the name of the corresponding node:

input_message = "What is Task Decomposition?"

for message, metadata in graph.stream(
{"messages": [{"role": "user", "content": input_message}]},
stream_mode="messages",
):
if metadata["langgraph_node"] == "generate":
print(message.content, end="|")
|Task| De|composition| is| a| technique| used| to| break| down| complicated| tasks| into| smaller|,| more| manageable| steps|.| It| often| involves| using| methods| like| Chain| of| Thought| (|Co|T|)| prompting|,| which| encourages| models| to| think| step| by| step| and| clarify| their| reasoning| process|.| This| approach| helps| in| enhancing| model| performance| on| complex| tasks| by| simplifying| them| into| sub|go|als|.||

Streaming intermediate steps​

Other streaming modes will generally stream steps from our invocation-- i.e., state updates from individual nodes. In this case, each node is just appending a new message to the state:

for step in graph.stream(
{"messages": [{"role": "user", "content": input_message}]},
stream_mode="values",
):
step["messages"][-1].pretty_print()
================================ Human Message =================================

What is Task Decomposition?
================================== Ai Message ==================================
Tool Calls:
retrieve (call_G3SZx6MgdHBMUVrKoYei9Vph)
Call ID: call_G3SZx6MgdHBMUVrKoYei9Vph
Args:
query: Task Decomposition
================================= Tool Message =================================
Name: retrieve

Source: {'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}
Content: Fig. 1. Overview of a LLM-powered autonomous agent system.
Component One: Planning#
A complicated task usually involves many steps. An agent needs to know what they are and plan ahead.
Task Decomposition#
Chain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to β€œthink step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.

Source: {'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}
Content: Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.
Task decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.
================================== Ai Message ==================================

Task Decomposition is the process of breaking down a complicated task into smaller, more manageable steps. It often involves using techniques like Chain of Thought (CoT), which encourages a model to think step by step to clarify its reasoning and enhance performance on complex tasks. This approach helps in understanding and organizing the task more effectively.

For more on streaming with LangGraph, check out its streaming documentation. For more information on streaming individual LangChain Runnables, refer to this guide.


Was this page helpful?