Hi, Is there any callback available to get trace/span id(if possible with details) that is being sent to collector using below code
from phoenix.trace.langchain import LangChainInstrumentor
LangChainInstrumentor().instrument()Here鈥檚 an example for using the InMemorySpanExporter that Mikyo mentioned. Let us know if it meets your needs.
pip install -U openinference-instrumentation-langchain opentelemetry-sdkpython code
from langchain_openai import ChatOpenAI
from openinference.instrumentation.langchain import LangChainInstrumentor
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
# from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# endpoint = "http://127.0.0.1:6006/v1/traces"
tracer_provider = trace_sdk.TracerProvider()
im_exporter = InMemorySpanExporter()
tracer_provider.add_span_processor(SimpleSpanProcessor(im_exporter))
# tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
if __name__ == "__main__":
ChatOpenAI().invoke("Write a haiku.")
spans = im_exporter.get_finished_spans()
im_exporter.clear()
print(spans[0].to_json())
hi Roger Y. Mikyo since get_qa_reference and get_retrieved_documents get all spans even those are already evaluated. I am thinking to get span and trace details using callback approach and run eval for each retriever span. I was able to get span and trace details by registering additional span processor in LagchainInstrumentor
import logging
from importlib.metadata import PackageNotFoundError
from importlib.util import find_spec
from typing import Any
from openinference.instrumentation.langchain import LangChainInstrumentor as Instrumentor
from openinference.semconv.resource import ResourceAttributes
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from phoenix.config import get_env_project_name
from phoenix.trace.exporter import _OpenInferenceExporter
from opentelemetry.sdk.trace.export import SpanProcessor, SpanExportResult
from opentelemetry.trace import Span
from typing import Callable
logger = logging.getLogger(__name__)
__all__ = ("LangChainInstrumentor",)
class MySpanProcessor(SpanProcessor):
def __init__(self, span_completed_callback: Callable[[Span], None]):
self._span_completed_callback = span_completed_callback
def on_end(self, span: 'Span') -> 'SpanExportResult':
self._span_completed_callback(span)
return SpanExportResult.SUCCESS
def shutdown(self) -> None:
pass # Optionally implement shutdown logic
class LangChainInstrumentor(Instrumentor):
def __init__(self, *args: Any, **kwargs: Any) -> None:
#self._span_completed_callback = _span_completed_callback
if find_spec("langchain_core") is None:
raise PackageNotFoundError(
"Missing `langchain-core`. Install with `pip install langchain-core`."
)
super().__init__()
def instrument(self, span_completed_callback) -> None:
tracer_provider = trace_sdk.TracerProvider(
resource=Resource({ResourceAttributes.PROJECT_NAME: get_env_project_name()}),
span_limits=trace_sdk.SpanLimits(max_attributes=10_000),
)
tracer_provider.add_span_processor(SimpleSpanProcessor(_OpenInferenceExporter()))
tracer_provider.add_span_processor(MySpanProcessor(span_completed_callback))
super().instrument(skip_dep_check=True, tracer_provider=tracer_provider)I will checkout InMemorySpanExporter as well.
hey gourav k. - I probably wouldn't recommend using a span processor to run evals quite yet as it will cause some load on your application. We're thinking you are gonna want to run it in a background process (cron). We will have some documentation on how to run eval jobs very shortly and would love to get your feedback.
hi Mikyo i will use span processor just to get retriever span id and trace id which will be pushed to a separate eval service. Eval service will be getting required span and trace details and run eval for only that span/trace by adding trace id in IS_ROOT and span_id in IS_RETRIEVER of Arize-ai/phoenix/src/phoenix/trace/dsl/helpers.py . This way i can avoid re-running eval for already processed span/trace. Please share your documentation. Hope it takes care of current issue of running eval for already processed span/trace
Hey gourav k. I think time might be the best way to ensure you don鈥檛 evaluate the same spans right now. Here鈥檚 an example that pushes evals using corn jobs https://github.com/Arize-ai/phoenix/tree/main/examples/cron-evals
HI Mikyo thanks for sharing it. I got gist of it. I have seen eval taking time beyond 1 min upto 90 seconds depending on traces/ retrieved documents, so it will be better to record start time on each iteration and use that last start time as input in next iteration instead of just hardcoded duration of running every n mins/seconds
