How to ensure at least $x time has passed since data_interval_end
? How allow for late data?
#42636
-
For example, I might be doing some reporting with a basic SQL query, say, number of orders fulfilled per store, per-day. QUERY = """
INSERT INTO reporting.fulfilment
SELECT date_trunc('day', fulfilled_at), store_id, count(order_id)
FROM orders
WHERE date_trunc('day', fulfilled_at) = {{ logical_date }}
GROUP BY date_trunc('day', fulfilled_at), store_id
"""
with DAG(dag_id='my_dag', schedule="@daily", start_date=datetime(2024, 10, 2, 0, 0, 0)):
op = SqlOperator(task_id="report_fulfillment", query = QUERY) During "normal" runs, a new DagRun will be created shortly after each UTC midnight, say the task starts at 2024-10-24T00:00:30 for a particular DagRun. BUT consider the case where an order was fulfilled at 2024-10-23T23:59:59 but the system did not actually update the row in the Our reporting is now incorrect! 😱 Now I have to wait for someone to notice and clear the task state to re-run the job. Or add some monitoring to check for late records or whatever. What would be great is if I could tell Airflow that there is a minimum duration that must elapse between the This would allow for some degree of lateness for the records; check out Flink's documentation on the subject. Of course, one could just add a TimeSensor as the first task in the DAG to delay the start. But IMO this is a bit of a hack; Airflow ought to just schedule it at the right time. Also if you need to re-run or backfill these DagRuns should ideally start ASAP and skip the TimeSensor. I guess you could use a BranchOperator to skip Does Airflow have any way of supporting this type of scheduling? I presume it could be achieved with a custom timetable but this seems like something a lot of people would have to handle so thought I would post the discussion. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
Airflow 3 is going to have a separation between "schedule" and "data partition" - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-76+Asset+Partitions It's much wider use case, but specifically in your case youwill be able to specify that data partition is offset from the schedule (say schedule is same as data partition but moved by 3 minutes for example). I think that will fulfill your expectations - and if you would like to take part in discussions and implementation of that - follow the devlist discussions, calls, you can also bring your case to the regular dev call we have on Airflow 3: |
Beta Was this translation helpful? Give feedback.
-
Ended up subclassing DateTimeSensorAsync to skip the deferral if the moment is in the past, just to reduce scheduling load a little bit.
|
Beta Was this translation helpful? Give feedback.
Airflow 3 is going to have a separation between "schedule" and "data partition" - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-76+Asset+Partitions
It's much wider use case, but specifically in your case youwill be able to specify that data partition is offset from the schedule (say schedule is same as data partition but moved by 3 minutes for example).
I think that will fulfill your expectations - and if you would like to take part in discussions and implementation of that - follow the devlist discussions, calls, you can also bring your case to the regular dev call we have on Airflow 3:
https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+3+Dev+call%3A+Meeting+Notes