Airflow 2.0.0+ - Pass A Dynamically Generated Dictionary To Dag Triggered By Triggerdagrunoperator
Previously, I was using the python_callable parameter of the TriggerDagRunOperator to dynamically alter the dag_run_obj payload that is passed to the newly triggered DAG. Since its
Solution 1:
The TriggerDagRunOperator now takes a conf
parameter to which a dictinoary can be provided as the conf object for the DagRun. Here is more information on triggering DAGs which you may find helpful as well.
EDIT
Since you need to execute a function to determine which DAG to trigger and do not want to create a custom TriggerDagRunOperator
, you could execute intakeFile()
in a PythonOperator
(or use the @task
decorator with the Task Flow API) and use the return value as the conf
argument in the TriggerDagRunOperator
. As part of Airflow 2.0, return values are automatically pushed to XCom within many operators; the PythonOperator
included.
Here is the general idea:
defintakeFile(*args, **kwargs):
# read from S3, get filename and pass to triggered DAG
bucket_name = os.environ.get("bucket_name")
s3_hook = S3Hook(aws_conn_id="aws_default")
s3_hook.copy_object()
s3_hook.delete_objects()
...
dag_run_obj.payload = {
"filePath": workingPath,
"source": source,
"fileName": fileName,
}
return dag_run_obj
get_dag_to_trigger = PythonOperator(
task_id="get_dag_to_trigger",
python_callable=intakeFile
)
triggerNewDAG_tsk = TriggerDagRunOperator(
task_id="triggerNewDAG_tsk",
trigger_dag_id="{{ ti.xcom_pull(task_ids='get_dag_to_trigger', key='return_value') }}",
)
get_dag_to_trigger >> triggerNewDAG_tsk
Post a Comment for "Airflow 2.0.0+ - Pass A Dynamically Generated Dictionary To Dag Triggered By Triggerdagrunoperator"