How to stream results from your RAG application
This guide explains how to stream results from a RAG application. It covers streaming tokens from the final output as well as intermediate steps of a chain (e.g., from query re-writing).
We'll work off of the Q&A app with sources we built over the LLM Powered Autonomous Agents blog post by Lilian Weng in the RAG tutorial.
Setupβ
Dependenciesβ
We'll use the following packages:
%pip install --upgrade --quiet langchain langchain-community langchainhub langchain-openai langchain-chroma beautifulsoup4
LangSmithβ
Many of the applications you build with LangChain will contain multiple steps with multiple invocations of LLM calls. As these applications get more and more complex, it becomes crucial to be able to inspect what exactly is going on inside your chain or agent. The best way to do this is with LangSmith.
Note that LangSmith is not needed, but it is helpful. If you do want to use LangSmith, after you sign up at the link above, make sure to set your environment variables to start logging traces:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = getpass.getpass()
Componentsβ
We will need to select three components from LangChain's suite of integrations.
A chat model:
- OpenAI
- Anthropic
- Azure
- AWS
- Cohere
- NVIDIA
- FireworksAI
- Groq
- MistralAI
- TogetherAI
pip install -qU langchain-openai
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass()
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
pip install -qU langchain-anthropic
import getpass
import os
os.environ["ANTHROPIC_API_KEY"] = getpass.getpass()
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20240620")
pip install -qU langchain-openai
import getpass
import os
os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass()
from langchain_openai import AzureChatOpenAI
llm = AzureChatOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
openai_api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)
pip install -qU langchain-google-vertexai
# Ensure your VertexAI credentials are configured
from langchain_google_vertexai import ChatVertexAI
llm = ChatVertexAI(model="gemini-1.5-flash")
pip install -qU langchain-aws
# Ensure your AWS credentials are configured
from langchain_aws import ChatBedrock
llm = ChatBedrock(model="anthropic.claude-3-5-sonnet-20240620-v1:0",
beta_use_converse_api=True)
pip install -qU langchain-cohere
import getpass
import os
os.environ["COHERE_API_KEY"] = getpass.getpass()
from langchain_cohere import ChatCohere
llm = ChatCohere(model="command-r-plus")
pip install -qU langchain-nvidia-ai-endpoints
import getpass
import os
os.environ["NVIDIA_API_KEY"] = getpass.getpass()
from langchain_nvidia_ai_endpoints import ChatNVIDIA
llm = ChatNVIDIA(model="meta/llama3-70b-instruct")
pip install -qU langchain-fireworks
import getpass
import os
os.environ["FIREWORKS_API_KEY"] = getpass.getpass()
from langchain_fireworks import ChatFireworks
llm = ChatFireworks(model="accounts/fireworks/models/llama-v3p1-70b-instruct")
pip install -qU langchain-groq
import getpass
import os
os.environ["GROQ_API_KEY"] = getpass.getpass()
from langchain_groq import ChatGroq
llm = ChatGroq(model="llama3-8b-8192")
pip install -qU langchain-mistralai
import getpass
import os
os.environ["MISTRAL_API_KEY"] = getpass.getpass()
from langchain_mistralai import ChatMistralAI
llm = ChatMistralAI(model="mistral-large-latest")
pip install -qU langchain-openai
import getpass
import os
os.environ["TOGETHER_API_KEY"] = getpass.getpass()
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
base_url="https://api.together.xyz/v1",
api_key=os.environ["TOGETHER_API_KEY"],
model="mistralai/Mixtral-8x7B-Instruct-v0.1",
)
An embedding model:
- OpenAI
- Azure
- AWS
- HuggingFace
- Ollama
- Cohere
- MistralAI
- Nomic
- NVIDIA
- Fake
pip install -qU langchain-openai
import getpass
os.environ["OPENAI_API_KEY"] = getpass.getpass()
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
pip install -qU langchain-openai
import getpass
os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass()
from langchain_openai import AzureOpenAIEmbeddings
embeddings = AzureOpenAIEmbeddings(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
openai_api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)
pip install -qU langchain-google-vertexai
from langchain_google_vertexai import VertexAIEmbeddings
embeddings = VertexAIEmbeddings(model="text-embedding-004")
pip install -qU langchain-aws
from langchain_aws import BedrockEmbeddings
embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v2:0")
pip install -qU langchain-huggingface
from langchain_huggingface import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
pip install -qU langchain-ollama
from langchain_ollama import OllamaEmbeddings
embeddings = OllamaEmbeddings(model="llama3")
pip install -qU langchain-cohere
import getpass
os.environ["COHERE_API_KEY"] = getpass.getpass()
from langchain_cohere import CohereEmbeddings
embeddings = CohereEmbeddings(model="embed-english-v3.0")
pip install -qU langchain-mistralai
import getpass
os.environ["MISTRALAI_API_KEY"] = getpass.getpass()
from langchain_mistralai import MistralAIEmbeddings
embeddings = MistralAIEmbeddings(model="mistral-embed")
pip install -qU langchain-nomic
import getpass
os.environ["NOMIC_API_KEY"] = getpass.getpass()
from langchain_nomic import NomicEmbeddings
embeddings = NomicEmbeddings(model="nomic-embed-text-v1.5")
pip install -qU langchain-nvidia-ai-endpoints
import getpass
os.environ["NVIDIA_API_KEY"] = getpass.getpass()
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings
embeddings = NVIDIAEmbeddings(model="NV-Embed-QA")
pip install -qU langchain-core
from langchain_core.embeddings import DeterministicFakeEmbedding
embeddings = DeterministicFakeEmbedding(size=4096)
And a vector store:
- In-memory
- AstraDB
- Chroma
- FAISS
- Milvus
- MongoDB
- PGVector
- Pinecone
- Qdrant
pip install -qU langchain-core
from langchain_core.vector_stores import InMemoryVectorStore
vector_store = InMemoryVectorStore(embeddings)
pip install -qU langchain-astradb
from langchain_astradb import AstraDBVectorStore
vector_store = AstraDBVectorStore(
embedding=embeddings,
api_endpoint=ASTRA_DB_API_ENDPOINT,
collection_name="astra_vector_langchain",
token=ASTRA_DB_APPLICATION_TOKEN,
namespace=ASTRA_DB_NAMESPACE,
)
pip install -qU langchain-chroma
from langchain_chroma import Chroma
vector_store = Chroma(embedding_function=embeddings)
pip install -qU langchain-community
from langchain_community.vectorstores import FAISS
vector_store = FAISS(embedding_function=embeddings)
pip install -qU langchain-milvus
from langchain_milvus import Milvus
vector_store = Milvus(embedding_function=embeddings)
pip install -qU langchain-mongodb
from langchain_mongodb import MongoDBAtlasVectorSearch
vector_store = MongoDBAtlasVectorSearch(
embedding=embeddings,
collection=MONGODB_COLLECTION,
index_name=ATLAS_VECTOR_SEARCH_INDEX_NAME,
relevance_score_fn="cosine",
)
pip install -qU langchain-postgres
from langchain_postgres import PGVector
vector_store = PGVector(
embedding=embeddings,
collection_name="my_docs",
connection="postgresql+psycopg://...",
)
pip install -qU langchain-pinecone
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
pc = Pinecone(api_key=...)
index = pc.Index(index_name)
vector_store = PineconeVectorStore(embedding=embeddings, index=index)
pip install -qU langchain-qdrant
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
client = QdrantClient(":memory:")
vector_store = QdrantVectorStore(
client=client,
collection_name="test",
embedding=embeddings,
)
RAG applicationβ
Let's reconstruct the Q&A app with sources we built over the LLM Powered Autonomous Agents blog post by Lilian Weng in the RAG tutorial.
First we index our documents:
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing_extensions import List, TypedDict
# Load and chunk contents of the blog
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)
# Index chunks
_ = vector_store.add_documents(documents=all_splits)
Next we build the application:
from langchain_core.messages import SystemMessage
from langchain_core.tools import tool
from langgraph.graph import END, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
# Define retrieval tool
@tool(response_format="content_and_artifact")
def retrieve(query: str):
"""Retrieve information related to a query."""
retrieved_docs = vector_store.similarity_search(query, k=2)
serialized = "\n\n".join(
(f"Source: {doc.metadata}\n" f"Content: {doc.page_content}")
for doc in retrieved_docs
)
return serialized, retrieved_docs
# Step 1: Generate an AIMessage that may include a tool-call to be sent.
def query_or_respond(state: MessagesState):
"""Generate tool call for retrieval or respond."""
llm_with_tools = llm.bind_tools([retrieve])
response = llm_with_tools.invoke(state["messages"])
# MessagesState appends messages to state instead of overwriting
return {"messages": [response]}
# Step 2: Execute the retrieval.
tools = ToolNode([retrieve])
# Step 3: Generate a response using the retrieved content.
def generate(state: MessagesState):
"""Generate answer."""
last_message = state["messages"][-1] # will be ToolMessage
docs_content = last_message.content # Retrieved context
system_message_content = (
"You are an assistant for question-answering tasks. "
"Use the following pieces of retrieved context to answer "
"the question. If you don't know the answer, say that you "
"don't know. Use three sentences maximum and keep the "
"answer concise."
"\n\n"
f"{docs_content}"
)
conversation_messages = [
message
for message in state["messages"]
if message.type in ("human", "system")
or (message.type == "ai" and not message.tool_calls)
]
prompt = [SystemMessage(system_message_content)] + conversation_messages
# Run
response = llm.invoke(prompt)
return {"messages": [response]}
# Build graph
graph_builder = StateGraph(MessagesState)
graph_builder.add_node(query_or_respond)
graph_builder.add_node(tools)
graph_builder.add_node(generate)
graph_builder.set_entry_point("query_or_respond")
graph_builder.add_conditional_edges(
"query_or_respond",
tools_condition,
{END: END, "tools": "tools"},
)
graph_builder.add_edge("tools", "generate")
graph_builder.add_edge("generate", END)
graph = graph_builder.compile()
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
Streaming final outputsβ
LangGraph supports several streaming modes, which can be controlled by specifying the stream_mode
parameter. Setting stream_mode="messages"
allows us to stream tokens from chat model invocations.
Note that there are multiple chat model invocations in this application-- the first step that generates a query, and the last step that generates a response. Below, we filter to only the last step using the name of the corresponding node:
input_message = "What is Task Decomposition?"
for message, metadata in graph.stream(
{"messages": [{"role": "user", "content": input_message}]},
stream_mode="messages",
):
if metadata["langgraph_node"] == "generate":
print(message.content, end="|")
|Task| De|composition| is| a| technique| used| to| break| down| complicated| tasks| into| smaller|,| more| manageable| steps|.| It| often| involves| using| methods| like| Chain| of| Thought| (|Co|T|)| prompting|,| which| encourages| models| to| think| step| by| step| and| clarify| their| reasoning| process|.| This| approach| helps| in| enhancing| model| performance| on| complex| tasks| by| simplifying| them| into| sub|go|als|.||
Streaming intermediate stepsβ
Other streaming modes will generally stream steps from our invocation-- i.e., state updates from individual nodes. In this case, each node is just appending a new message to the state:
for step in graph.stream(
{"messages": [{"role": "user", "content": input_message}]},
stream_mode="values",
):
step["messages"][-1].pretty_print()
================================[1m Human Message [0m=================================
What is Task Decomposition?
==================================[1m Ai Message [0m==================================
Tool Calls:
retrieve (call_G3SZx6MgdHBMUVrKoYei9Vph)
Call ID: call_G3SZx6MgdHBMUVrKoYei9Vph
Args:
query: Task Decomposition
=================================[1m Tool Message [0m=================================
Name: retrieve
Source: {'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}
Content: Fig. 1. Overview of a LLM-powered autonomous agent system.
Component One: Planning#
A complicated task usually involves many steps. An agent needs to know what they are and plan ahead.
Task Decomposition#
Chain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to βthink step by stepβ to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the modelβs thinking process.
Source: {'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}
Content: Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.
Task decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.
==================================[1m Ai Message [0m==================================
Task Decomposition is the process of breaking down a complicated task into smaller, more manageable steps. It often involves using techniques like Chain of Thought (CoT), which encourages a model to think step by step to clarify its reasoning and enhance performance on complex tasks. This approach helps in understanding and organizing the task more effectively.
For more on streaming with LangGraph, check out its streaming documentation. For more information on streaming individual LangChain Runnables, refer to this guide.