- Get link
- X
- Other Apps
I have been looking at the airflow documentation for creating a scheduled dag run. My use case is i want to create & schedule a dag run on a specified time only once. This time value i will be getting from datetime column from a database table.
For this we do have schedule/schedule_interval DAG parameter but that accepts a datetime objects and cron expression which is recurring in nature. And if i want to run only once schedule/schedule_interval needs to be set to @once.
My ideal workflow would be
- I'll have a master DAG 1 which scheduled to run every 10mins
- This master dags reads the date time column values
- using the datetime values this dag will create other dags which is scheduled to run on specified time which can vary for each dag here (datetime values)
DAG 1 (lets assume this is running at 9:00AM ) --> create 2 DAGs which run at 9:11 & 9:15
Reference documentaion
https://docs.astronomer.io/learn/dynamically-generating-dags
https://docs.astronomer.io/learn/scheduling-in-airflow#parameters
looking for some good suggestions preferably in python
You can use Airflow's PythonOperator to dynamically create and schedule DAGs based on the datetime values from your database table. Here's an example implementation:
- Define your master DAG, which runs every 10 minutes, and reads the datetime column values from your database table. You can use the DAG's PythonOperator to read the values and pass them to the next task:
pythonfrom datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def read_datetime_values():
# code to read datetime values from database table
datetime_values = ['2023-04-08 09:11:00', '2023-04-08 09:15:00']
return datetime_values
def create_subdags(datetime_values):
for dt in datetime_values:
subdag_id = 'subdag_{}'.format(dt)
subdag = DAG(
subdag_id,
description='Subdag for datetime {}'.format(dt),
schedule_interval=None, # set to None to run only once
start_date=datetime.strptime(dt, '%Y-%m-%d %H:%M:%S'),
catchup=False # don't backfill the DAG runs
)
# add tasks to the subdag here
globals()[subdag_id] = subdag # register the subdag in the global namespace
dag = DAG(
'master_dag',
description='Master DAG to create subdags dynamically',
schedule_interval='*/10 * * * *', # run every 10 minutes
start_date=datetime(2023, 4, 8),
catchup=False # don't backfill the DAG runs
)
read_datetime_values_task = PythonOperator(
task_id='read_datetime_values',
python_callable=read_datetime_values,
dag=dag
)
create_subdags_task = PythonOperator(
task_id='create_subdags',
python_callable=create_subdags,
op_args=[read_datetime_values_task.output],
dag=dag
)
read_datetime_values_task >> create_subdags_task
In the create_subdags() function, loop through the datetime values and create a new subdag for each value. Set the subdag's schedule_interval to None to run only once. You can also set the start_date to the datetime value, and catchup to False to prevent backfilling of the DAG runs.
Inside each subdag, you can define the tasks you want to run at the specified datetime value.
Note: In order for the dynamically generated subdags to be recognized by Airflow, you need to register them in the global namespace. This is done in the create_subdags() function using the line: globals()[subdag_id] = subdag.
- Get link
- X
- Other Apps
Comments
Post a Comment