Here’s a minimal Airflow DAG task with Phoenix instrumentation. This assumes all Airflow workers and the scheduler have the required packages installed (e.g., arize-phoenix-otel, openinference-instrumentation-openai):
```
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import os
# Set Phoenix endpoint as an environment variable (can also be set in worker config)
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "http://your-phoenix-host:6006/v1/traces"
# Import and register Phoenix tracing before any LLM/framework imports
from phoenix.otel import register
tracer_provider = register(
project_name="airflow-llm-pipeline",
auto_instrument=True, # auto-instrument supported libraries
batch=True, # recommended for production
)
def traced_task(**kwargs):
# Your LLM or other instrumented code here
import openai
openai.api_key = "sk-..." # or use env var
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello from Airflow!"}],
)
print(response.choices[0].message.content)
with DAG(
dag_id="phoenix_airflow_example",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
run_traced = PythonOperator(
task_id="run_traced",
python_callable=traced_task,
provide_context=True,
)
```
*Key points:*
- Install all dependencies in every worker and scheduler environment.
- Set PHOENIX_COLLECTOR_ENDPOINT in the environment (not just in code).
- Import and call phoenix.otel.register before any LLM/framework imports.
- Use auto_instrument=True for automatic tracing of supported libraries.
Let me know if you need a requirements.txt example or more advanced setup!
Sources:
- <https://github.com/Arize-ai/phoenix/issues/8380|https://github.com/Arize-ai/phoenix/issues/8380>
- <https://arize.com/docs/phoenix/tracing/llm-traces-1/quickstart-tracing-python#overview|https://arize.com/docs/phoenix/tracing/llm-traces-1/quickstart-tracing-python#overview>