Integrating MongoDB Traces with Docker Phoenix Service
Hi, is it possible to provide traces either as file or via connection to a docker phoenix service? The traces have been exported to mongodb before, so they can not be processed as usual by llama_index. I tried to provide the traces after they have been saved to a mongodb. I tried this, but the phoenix service does neither accept the TRACE_DATASET_NAME, nor the uuid that is generated (dataset.save() and dataset.to_disc() I tried both):
def list_dict_to_dict_arr(data: list[dict[str, Any]]) -> dict:
dict_of_arr = {}
# loop with enumerate
for i, item in enumerate(data):
# fill all keys of the dict with None, which are not in the item
for key in item.keys():
if key == "_id":
continue
if key in dict_of_arr.keys():
dict_of_arr[key][i] = item.get(key)
continue
if key == "context":
if dict_of_arr.get("context.trace_id") is None:
dict_of_arr["context.trace_id"] = []
dict_of_arr["context.span_id"] = []
for j in range(len(data)):
dict_of_arr["context.trace_id"].append(None)
dict_of_arr["context.span_id"].append(None)
context = item.get("context")
if context is None:
dict_of_arr["context.trace_id"][i] = None
dict_of_arr["context.span_id"][i] = None
continue
dict_of_arr["context.trace_id"][i] = context.get("trace_id")
dict_of_arr["context.span_id"][i] = context.get("span_id")
continue
dict_of_arr[key] = []
for j in range(len(data)):
dict_of_arr[key].append(None)
dict_of_arr[key][i] = item.get(key)
# all arrays must be of the same length
for key in dict_of_arr.keys():
if len(dict_of_arr[key]) != len(data):
log.error("Error on key: ", key=key, data=dict_of_arr[key], length=len(dict_of_arr[key]), data_length=len(data))
raise ValueError("Length of array is not equal to the length of data")
return dict_of_arr
def format_mongo_span_data(mongodb: Database[Any]) -> px.TraceDataset:
phoenix_trace_collection = mongodb[TRACE_COLLECTION_NAME]
data: list[dict[str, Any]] = list(phoenix_trace_collection.find())
dataframe_new = pd.DataFrame.from_dict(list_dict_to_dict_arr(data))
dataframe_new["start_time"] = normalize_timestamps(dataframe_new["start_time"].str.rstrip("Z"))
dataframe_new["end_time"] = normalize_timestamps(dataframe_new["end_time"].str.rstrip("Z"))
return px.TraceDataset(dataframe=dataframe_new, name=TRACE_DATASET_NAME)
mongodb = get_mongodb()
if mongodb is None:
raise ValueError("MongoDB is not available")
dataset = format_mongo_span_data(mongodb)
dataset.save()