Hey everyone! I'm using arize-phoenix with llama-index / fastapi on back, next js on frontend. The user sets a cookie with email = "their@email.com" when they log in. I want to pass that email as a parameter along with every single interaction the user makes - so that when I am reading the logs, I can tie it to a specific user. How would you guys approach doing this? I've been reading the codebase but I'm struggling with how to get this moving.
Here's my Chat.py:
from pydantic import BaseModel
from typing import List, Any, Optional, Dict, Tuple
from fastapi import APIRouter, Depends, HTTPException, Request, status
from llama_index.core.chat_engine.types import (
BaseChatEngine,
StreamingAgentChatResponse,
)
from llama_index.core.schema import NodeWithScore
from llama_index.core.llms import ChatMessage, MessageRole
from app.engine import get_chat_engine
from app.api.routers.vercel_response import VercelStreamResponse
chat_router = r = APIRouter()
class _Message(BaseModel):
role: MessageRole
content: str
class _ChatData(BaseModel):
messages: List[_Message]
class Config:
json_schema_extra = {
"example": {
"messages": [
{
"role": "user",
"content": "What standards for letters exist?",
}
]
}
}
class _SourceNodes(BaseModel):
id: str
metadata: Dict[str, Any]
score: Optional[float]
text: str
@classmethod
def from_source_node(cls, source_node: NodeWithScore):
return cls(
id=source_node.node.node_id,
metadata=source_node.node.metadata,
score=source_node.score,
text=source_node.node.text,
)
@classmethod
def from_source_nodes(cls, source_nodes: List[NodeWithScore]):
return [cls.from_source_node(node) for node in source_nodes]
class _Result(BaseModel):
result: _Message
nodes: List[_SourceNodes]
async def parse_chat_data(data: _ChatData) -> Tuple[str, List[ChatMessage]]:
if len(data.messages) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No messages provided",
)
last_message = data.messages.pop()
if last_message.role != MessageRole.USER:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Last message must be from user",
)
messages = [
ChatMessage(
role=m.role,
content=m.content,
)
for m in data.messages
]
return last_message.content, messages
@r.post("")
async def chat(
request: Request,
data: _ChatData,
chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
last_message_content, messages = await parse_chat_data(data)
response = await chat_engine.astream_chat(last_message_content, messages)
async def event_generator(request: Request, response: StreamingAgentChatResponse):
async for token in response.async_response_gen():
if await request.is_disconnected():
break
yield VercelStreamResponse.convert_text(token)
yield VercelStreamResponse.convert_data(
{
"type": "sources",
"data": {
"nodes": [
_SourceNodes.from_source_node(node).dict()
for node in response.source_nodes
]
},
}
)
return VercelStreamResponse(content=event_generator(request, response))
# non-streaming endpoint
@r.post("/request")
async def chat_request(
data: _ChatData,
chat_engine: BaseChatEngine = Depends(get_chat_engine),
) -> _Result:
last_message_content, messages = await parse_chat_data(data)
response = await chat_engine.achat(last_message_content, messages)
return _Result(
result=_Message(role=MessageRole.ASSISTANT, content=response.response),
nodes=_SourceNodes.from_source_nodes(response.source_nodes),
)and I use this to instantiate phoenix:
def init_observability():
px.launch_app()
phoenix_global_handler_name = "arize_phoenix"
set_global_handler(phoenix_global_handler_name)