How can I edit a span name from chain to retriever?
am using pathway retriever which is not getting picked up phoenix https://python.langchain.com/v0.1/docs/integrations/vectorstores/pathway/ any solution? here is my other thread https://arize-ai.slack.com/archives/C04R3GXC8HK/p1716907969686599?thread_ts=1716888033.244609&cid=C04R3GXC8HK
if u guys have an example for how I can do this with custom retrievers; will be a great help
yup;
class OpenSearchRetriever(BaseRetriever):
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
docs = client.query("What is bitcoin?")
documents = []
for doc in docs:
documents.append(Document(page_content=doc.get(
"text"), metadata=doc.get("metadata")))
return documents
retriever = OpenSearchRetriever()client here is pathway client
Tough to say what might be going on without access to the code. Here's a minimal working example of instrumenting a custom retriever with OpenInference and printing the span to the console. Hope this helps!
# Uses the following:
#
# langchain-core==0.2.1
# openinference-instrumentation-langchain==0.1.16
# opentelemetry-api==1.24.0
# opentelemetry-sdk==1.24.0
from typing import List
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from openinference.instrumentation.langchain import LangChainInstrumentor
from opentelemetry import trace as trace_api
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
tracer_provider = trace_sdk.TracerProvider()
trace_api.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
LangChainInstrumentor().instrument()
class ToyRetriever(BaseRetriever):
"""A toy retriever that contains the top k documents that contain the user query.
This retriever only implements the sync method _get_relevant_documents.
If the retriever were to involve file access or network access, it could benefit
from a native async implementation of `_aget_relevant_documents`.
As usual, with Runnables, there's a default async implementation that's provided
that delegates to the sync implementation running on another thread.
"""
documents: List[Document]
"""List of documents to retrieve from."""
k: int
"""Number of top results to return"""
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Sync implementations for retriever."""
matching_documents = []
for document in self.documents:
if len(matching_documents) > self.k:
return matching_documents
if query.lower() in document.page_content.lower():
matching_documents.append(document)
return matching_documents
documents = [
Document(
page_content="Dogs are great companions, known for their loyalty and friendliness.",
metadata={"type": "dog", "trait": "loyalty"},
),
Document(
page_content="Cats are independent pets that often enjoy their own space.",
metadata={"type": "cat", "trait": "independence"},
),
Document(
page_content="Goldfish are popular pets for beginners, requiring relatively simple care.",
metadata={"type": "fish", "trait": "low maintenance"},
),
Document(
page_content="Parrots are intelligent birds capable of mimicking human speech.",
metadata={"type": "bird", "trait": "intelligence"},
),
Document(
page_content="Rabbits are social animals that need plenty of space to hop around.",
metadata={"type": "rabbit", "trait": "social"},
),
]
retriever = ToyRetriever(documents=documents, k=3)
documents = retriever.invoke("that")
print(documents)something weird this works
# imports
load_dotenv()
LangChainInstrumentor().instrument()
def dummy_retriever(query: str):
docs = client.query(query)
documents = []
for doc in docs:
documents.append(Document(page_content=doc.get(
"text"), metadata=doc.get("metadata")))
return documents
class ToyRetriever(BaseRetriever):
"""A toy retriever that contains the top k documents that contain the user query.
This retriever only implements the sync method _get_relevant_documents.
If the retriever were to involve file access or network access, it could benefit
from a native async implementation of `_aget_relevant_documents`.
As usual, with Runnables, there's a default async implementation that's provided
that delegates to the sync implementation running on another thread.
"""
documents: List[Document]
"""List of documents to retrieve from."""
k: int
"""Number of top results to return"""
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Sync implementations for retriever."""
print(self.documents)
matching_documents: List[Document] = []
for document in self.documents:
matching_documents.append(document)
return matching_documents
documents = dummy_retriever("What is bitcoin?")
retriever = ToyRetriever(documents=[], k=3)
documents = retriever.invoke("what is bitcoin")gets logged into phoenix
but this doesnt
# imports
LangChainInstrumentor().instrument()
def dummy_retriever(query: str):
docs = client.query(query)
documents = []
for doc in docs:
documents.append(Document(page_content=doc.get(
"text"), metadata=doc.get("metadata")))
return documents
class ToyRetriever(BaseRetriever):
"""A toy retriever that contains the top k documents that contain the user query.
This retriever only implements the sync method _get_relevant_documents.
If the retriever were to involve file access or network access, it could benefit
from a native async implementation of `_aget_relevant_documents`.
As usual, with Runnables, there's a default async implementation that's provided
that delegates to the sync implementation running on another thread.
"""
documents: List[Document]
"""List of documents to retrieve from."""
k: int
"""Number of top results to return"""
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Sync implementations for retriever."""
print(self.documents)
matching_documents: List[Document] = []
for document in self.documents:
matching_documents.append(document)
return matching_documents
documents = dummy_retriever("What is bitcoin?")
retriever = ToyRetriever(documents=documents, k=3)
documents = retriever.invoke("what is bitcoin")
# print(documents)馃槙
documents are correctly printed
there is input and output but inside no output , this close to solving this
