One more question (for now lol)! - in my fastAPI app that serves all this, I have a:
with tracer.start_as_current_span("Chat interaction") as span:This groups all of my various sub-chains, retriever, and final generation spans together in phoenix as expected. But, for streaming, I do a:
//still under the with tracing block
return StreamingResponse(content=stream_generator(...))where stream_generator is a separate function that deals with asyncronously consuming the asyncgenerator langchain returns, and saving the results to my DB. But, now the grouping is "broken" in phoenix for the streaming case - as actually executing the other function that consumes the generator breaks the tracing span. I'm sure its something simple, but I somehow need to let the other function know which top level span it belongs to
Yeah streaming is definitely tricky. Maybe one solution I came up with might help? It's llama-index but might give you some ideas: https://github.com/Arize-ai/openinference/blob/main/python/examples/llama-index/backend/app/api/routers/chat.py#L95C1-L118C77
# streaming endpoint - delete if not needed
@r.post("")
async def chat(
request: Request,
data: _ChatData,
chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
span = tracer.start_span("chat", attributes={SpanAttributes.OPENINFERENCE_SPAN_KIND: "CHAIN"})
with trace.use_span(span, end_on_exit=False):
last_message_content, messages = await parse_chat_data(data)
span.set_attribute(SpanAttributes.INPUT_VALUE, last_message_content)
response = await chat_engine.astream_chat(last_message_content, messages)
async def event_generator():
full_response = ""
async for token in response.async_response_gen():
if await request.is_disconnected():
break
full_response = full_response + token
yield token
span.set_attribute(SpanAttributes.OUTPUT_VALUE, full_response)
span.end()
return StreamingResponse(event_generator(), media_type="text/plain")note we are going to have utility functions to set metadata in context soon without having to manually set it on spans so that might make things easier very soon.
hm - thats basically what I have - almost exactly - but using langchain. for some reason, my final output chain is still not nested underneath the top level span in phoenix - however, the:
# save final output to top-level span span.set_attribute(SpanAttributes.OUTPUT_VALUE, str(accumulated_message.content))
span.end()That I place inside the steam_generator that gets passed to StreamingResponse is correctly bubbling up the final output to the top level chain... so it looks like this:
Interesting. Let me dive in a bit deeper this afternoon. Have to test out how the span context propagation works there
Hey Trantor D. just wanted to give you a heads up that I probably don't have bandwidth to look into this today but definitely file us a github issue if you figure out any more details. I have some thoughts but would take some time to create a working example. Anything you might be able to give us would be super helpful.
Thanks and no worries! Working on a MWE myself to make sure it’s not something in my end
I got a MWE Mikyo :
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import json
from openinference.instrumentation.langchain import LangChainInstrumentor
from openinference.semconv.resource import ResourceAttributes
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.semconv.trace import SpanAttributes
PHEONIX_ENDPOINT = "phoenx-endpoint.k8s"
resource = Resource(
attributes={
ResourceAttributes.PROJECT_NAME: "mwe-test",
"service.name": "mwe-test",
}
)
tracer_provider = trace_sdk.TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(PHEONIX_ENDPOINT)))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer_provider().get_tracer(__name__)
LangChainInstrumentor().instrument()
class Test:
def __init__(self) -> None:
self.llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
self.prompt = ChatPromptTemplate.from_messages(
[
("system", "Pick a random number between {range} and respond to the user"),
]
)
self.prompt2 = ChatPromptTemplate.from_messages(
[
("system", "Pick a random number between {range} and say cheese that many times"),
]
)
self.prompt3 = ChatPromptTemplate.from_messages(
[
("system", "Try to guess what the user is doing based on {a} and {b}"),
]
)
self.chain1 = (self.prompt | self.llm | StrOutputParser()).with_config({"run_name": "chain1"})
self.chain2 = (self.prompt2 | self.llm | StrOutputParser()).with_config({"run_name": "chain2"})
self.chain3 = (self.prompt3 | self.llm | StrOutputParser()).with_config({"run_name": "chain3"})
async def form_inputs(self, upper_bound: int):
output1 = await self.chain1.ainvoke({"range": f"11 and {upper_bound}"})
output2 = await self.chain2.ainvoke({"range": "1 and 4"})
return {"chain1": output1, "chain2": output2}
app = FastAPI()
test = Test()
@app.post("/chat")
async def chat(upper_bound: int):
span = tracer.start_span(name="top level", attributes={SpanAttributes.OPENINFERENCE_SPAN_KIND: "CHAIN"})
span.set_attribute(SpanAttributes.INPUT_VALUE, upper_bound)
with trace.use_span(span=span, end_on_exit=False):
inputs = await test.form_inputs(upper_bound)
generator = test.chain3.astream({"a": inputs["chain1"], "b": inputs["chain2"]})
async def stream_generator():
full_response = ""
async for token in generator:
full_response += token
yield json.dumps(token) + "\n"
span.set_attribute(SpanAttributes.OUTPUT_VALUE, full_response)
span.end()
return StreamingResponse(stream_generator())Before doing the with... context block, they show up as you expect - three top-level traces. After doing the with context block, the first two chains are properly captured in the top level chain, but not the third:
Thanks Trantor D.! I'll take a look tomorrow when I have some cycles! https://github.com/Arize-ai/openinference/issues/423
Trantor D. I got the trace to stitch together this way:
# To run this example, you need to run the following command:
# fastapi dev fast_api.py
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from openinference.instrumentation.langchain import LangChainInstrumentor
from openinference.semconv.resource import ResourceAttributes
from openinference.semconv.trace import SpanAttributes
from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
PHOENIX_ENDPOINT = "http://localhost:6006/v1/traces"
resource = Resource(
attributes={
ResourceAttributes.PROJECT_NAME: "mwe-test",
"service.name": "mwe-test",
}
)
tracer_provider = trace_sdk.TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(PHOENIX_ENDPOINT)))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer_provider().get_tracer(__name__)
LangChainInstrumentor().instrument()
class Test:
def __init__(self) -> None:
print("Initializing Test")
self.llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
self.prompt = ChatPromptTemplate.from_messages(
[
("system", "Pick a random number between {range} and respond to the user"),
]
)
self.prompt2 = ChatPromptTemplate.from_messages(
[
("system", "Pick a random number between {range} and say cheese that many times"),
]
)
self.prompt3 = ChatPromptTemplate.from_messages(
[
("system", "Try to guess what the user is doing based on {a} and {b}"),
]
)
self.chain1 = (self.prompt | self.llm | StrOutputParser()).with_config(
{"run_name": "chain1"}
)
self.chain2 = (self.prompt2 | self.llm | StrOutputParser()).with_config(
{"run_name": "chain2"}
)
self.chain3 = (self.prompt3 | self.llm | StrOutputParser()).with_config(
{"run_name": "chain3"}
)
async def form_inputs(self, upper_bound: int):
output1 = await self.chain1.ainvoke({"range": f"11 and {upper_bound}"})
output2 = await self.chain2.ainvoke({"range": "1 and 4"})
return {"chain1": output1, "chain2": output2}
app = FastAPI()
test = Test()
@app.post("/chat")
async def chat(upper_bound: int):
root_span = tracer.start_span(
name="top level",
attributes={
SpanAttributes.OPENINFERENCE_SPAN_KIND: "CHAIN",
SpanAttributes.INPUT_VALUE: upper_bound,
},
)
context = trace.set_span_in_context(root_span)
context = context_api.get_current()
print(f"tl context {context}")
with trace.use_span(span=root_span, end_on_exit=False):
inputs = await test.form_inputs(upper_bound)
generator = test.chain3.astream({"a": inputs["chain1"], "b": inputs["chain2"]})
async def stream_generator():
full_response = ""
with trace.use_span(span=root_span) as span:
async for token in generator:
full_response += token
yield json.dumps(token) + "\n"
span.set_attribute(SpanAttributes.OUTPUT_VALUE, full_response)
span.end()
return StreamingResponse(stream_generator())
The general problem is OTEL context doesn't get propagated into that async call so you have to set it again I think.
Mikyo - thanks for the help! Can confirm this works for me as well - is this a bug (or feature?) of OTEL then?
I think this is probably just the behavior of OTEL contexts unfortunately - when things are async and you want to attach spans to a parent, you have to manually set the span context. I know it's confusing but since your first HTTP request exited but you want to stream the outputs and attach those tokens, I think this is the only way.
Thank you! All these with indentation blocks are cumbersome, but they work!
Mikyo From your solution above with intercepting StreamingReponse with stream_generator(), does it break the streaming response asysnc concept (returning the stream of 1 token at a time) by blocking it until all the tokens are returned and sent to Phoenix UI? It seems to unintentionally turn this into a sync chat message, right?
