Skip to content Skip to sidebar Skip to footer

Airflow 2.0.0+ - Pass A Dynamically Generated Dictionary To Dag Triggered By Triggerdagrunoperator

Previously, I was using the python_callable parameter of the TriggerDagRunOperator to dynamically alter the dag_run_obj payload that is passed to the newly triggered DAG. Since its

Solution 1:

The TriggerDagRunOperator now takes a conf parameter to which a dictinoary can be provided as the conf object for the DagRun. Here is more information on triggering DAGs which you may find helpful as well.

EDIT

Since you need to execute a function to determine which DAG to trigger and do not want to create a custom TriggerDagRunOperator, you could execute intakeFile() in a PythonOperator (or use the @task decorator with the Task Flow API) and use the return value as the conf argument in the TriggerDagRunOperator. As part of Airflow 2.0, return values are automatically pushed to XCom within many operators; the PythonOperator included.

Here is the general idea:

defintakeFile(*args, **kwargs):

    # read from S3, get filename and pass to triggered DAG
    bucket_name = os.environ.get("bucket_name")
    s3_hook = S3Hook(aws_conn_id="aws_default")
    s3_hook.copy_object()
    s3_hook.delete_objects()
    ...

    dag_run_obj.payload = {
        "filePath": workingPath,
        "source": source,
        "fileName": fileName,
    }

    return dag_run_obj


get_dag_to_trigger = PythonOperator(
    task_id="get_dag_to_trigger",
    python_callable=intakeFile
)

triggerNewDAG_tsk = TriggerDagRunOperator(
    task_id="triggerNewDAG_tsk",
    trigger_dag_id="{{ ti.xcom_pull(task_ids='get_dag_to_trigger', key='return_value') }}",
)

get_dag_to_trigger >> triggerNewDAG_tsk

Post a Comment for "Airflow 2.0.0+ - Pass A Dynamically Generated Dictionary To Dag Triggered By Triggerdagrunoperator"