How to dynamically create multiple scheduled dags which runs on specifc time only once in airflow

 I have been looking at the airflow documentation for creating a scheduled dag run. My use case is i want to create & schedule a dag run on a specified time only once. This time value i will be getting from datetime column from a database table.

For this we do have schedule/schedule_interval DAG parameter but that accepts a datetime objects and cron expression which is recurring in nature. And if i want to run only once schedule/schedule_interval needs to be set to @once.

My ideal workflow would be

  • I'll have a master DAG 1 which scheduled to run every 10mins
  • This master dags reads the date time column values
  • using the datetime values this dag will create other dags which is scheduled to run on specified time which can vary for each dag here (datetime values)

DAG 1 (lets assume this is running at 9:00AM ) --> create 2 DAGs which run at 9:11 & 9:15

Reference documentaion

https://docs.astronomer.io/learn/dynamically-generating-dags
https://docs.astronomer.io/learn/scheduling-in-airflow#parameters

looking for some good suggestions preferably in python


You can use Airflow's PythonOperator to dynamically create and schedule DAGs based on the datetime values from your database table. Here's an example implementation:

  1. Define your master DAG, which runs every 10 minutes, and reads the datetime column values from your database table. You can use the DAG's PythonOperator to read the values and pass them to the next task:
python
from datetime import datetime from airflow import DAG from airflow.operators.python_operator import PythonOperator def read_datetime_values(): # code to read datetime values from database table datetime_values = ['2023-04-08 09:11:00', '2023-04-08 09:15:00'] return datetime_values def create_subdags(datetime_values): for dt in datetime_values: subdag_id = 'subdag_{}'.format(dt) subdag = DAG( subdag_id, description='Subdag for datetime {}'.format(dt), schedule_interval=None, # set to None to run only once start_date=datetime.strptime(dt, '%Y-%m-%d %H:%M:%S'), catchup=False # don't backfill the DAG runs ) # add tasks to the subdag here globals()[subdag_id] = subdag # register the subdag in the global namespace dag = DAG( 'master_dag', description='Master DAG to create subdags dynamically', schedule_interval='*/10 * * * *', # run every 10 minutes start_date=datetime(2023, 4, 8), catchup=False # don't backfill the DAG runs ) read_datetime_values_task = PythonOperator( task_id='read_datetime_values', python_callable=read_datetime_values, dag=dag ) create_subdags_task = PythonOperator( task_id='create_subdags', python_callable=create_subdags, op_args=[read_datetime_values_task.output], dag=dag ) read_datetime_values_task >> create_subdags_task
  1. In the create_subdags() function, loop through the datetime values and create a new subdag for each value. Set the subdag's schedule_interval to None to run only once. You can also set the start_date to the datetime value, and catchup to False to prevent backfilling of the DAG runs.

  2. Inside each subdag, you can define the tasks you want to run at the specified datetime value.

Note: In order for the dynamically generated subdags to be recognized by Airflow, you need to register them in the global namespace. This is done in the create_subdags() function using the line: globals()[subdag_id] = subdag.

Comments