-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix short circuit in mapped tasks #44925
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,8 +121,11 @@ def skip( | |
raise ValueError("dag_run is required") | ||
|
||
task_ids_list = [d.task_id for d in task_list] | ||
SkipMixin._set_state_to_skipped(dag_run, task_ids_list, session) | ||
session.commit() | ||
|
||
# The following could be applied only for non-mapped tasks | ||
if map_index == -1: | ||
SkipMixin._set_state_to_skipped(dag_run, task_ids_list, session) | ||
session.commit() | ||
Comment on lines
+125
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block could theoretically be removed, and then setting the skipped states will be done exclusively by |
||
|
||
if task_id is not None: | ||
from airflow.models.xcom import XCom | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
from airflow.models.taskinstance import PAST_DEPENDS_MET | ||
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep | ||
from airflow.utils.db import LazySelectSequence | ||
|
||
|
||
class NotPreviouslySkippedDep(BaseTIDep): | ||
|
@@ -38,7 +39,6 @@ def _get_dep_statuses(self, ti, session, dep_context): | |
XCOM_SKIPMIXIN_FOLLOWED, | ||
XCOM_SKIPMIXIN_KEY, | ||
XCOM_SKIPMIXIN_SKIPPED, | ||
SkipMixin, | ||
) | ||
from airflow.utils.state import TaskInstanceState | ||
|
||
|
@@ -49,46 +49,47 @@ def _get_dep_statuses(self, ti, session, dep_context): | |
finished_task_ids = {t.task_id for t in finished_tis} | ||
|
||
for parent in upstream: | ||
if isinstance(parent, SkipMixin): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initially I though of making it: But then I saw that tests pass without this |
||
if parent.task_id not in finished_task_ids: | ||
# This can happen if the parent task has not yet run. | ||
continue | ||
if parent.task_id not in finished_task_ids: | ||
# This can happen if the parent task has not yet run. | ||
continue | ||
|
||
prev_result = ti.xcom_pull(task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY, session=session) | ||
prev_result = ti.xcom_pull( | ||
task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY, session=session, map_indexes=ti.map_index | ||
) | ||
|
||
if prev_result is None: | ||
# This can happen if the parent task has not yet run. | ||
continue | ||
if isinstance(prev_result, LazySelectSequence): | ||
prev_result = next(iter(prev_result)) | ||
|
||
should_skip = False | ||
if ( | ||
XCOM_SKIPMIXIN_FOLLOWED in prev_result | ||
and ti.task_id not in prev_result[XCOM_SKIPMIXIN_FOLLOWED] | ||
): | ||
# Skip any tasks that are not in "followed" | ||
should_skip = True | ||
elif ( | ||
XCOM_SKIPMIXIN_SKIPPED in prev_result | ||
and ti.task_id in prev_result[XCOM_SKIPMIXIN_SKIPPED] | ||
): | ||
# Skip any tasks that are in "skipped" | ||
should_skip = True | ||
if prev_result is None: | ||
# This can happen if the parent task has not yet run. | ||
continue | ||
|
||
if should_skip: | ||
# If the parent SkipMixin has run, and the XCom result stored indicates this | ||
# ti should be skipped, set ti.state to SKIPPED and fail the rule so that the | ||
# ti does not execute. | ||
if dep_context.wait_for_past_depends_before_skipping: | ||
past_depends_met = ti.xcom_pull( | ||
task_ids=ti.task_id, key=PAST_DEPENDS_MET, session=session, default=False | ||
) | ||
if not past_depends_met: | ||
yield self._failing_status( | ||
reason=("Task should be skipped but the past depends are not met") | ||
) | ||
return | ||
ti.set_state(TaskInstanceState.SKIPPED, session) | ||
yield self._failing_status( | ||
reason=f"Skipping because of previous XCom result from parent task {parent.task_id}" | ||
should_skip = False | ||
if ( | ||
XCOM_SKIPMIXIN_FOLLOWED in prev_result | ||
and ti.task_id not in prev_result[XCOM_SKIPMIXIN_FOLLOWED] | ||
): | ||
# Skip any tasks that are not in "followed" | ||
should_skip = True | ||
elif XCOM_SKIPMIXIN_SKIPPED in prev_result and ti.task_id in prev_result[XCOM_SKIPMIXIN_SKIPPED]: | ||
# Skip any tasks that are in "skipped" | ||
should_skip = True | ||
|
||
if should_skip: | ||
# If the parent SkipMixin has run, and the XCom result stored indicates this | ||
# ti should be skipped, set ti.state to SKIPPED and fail the rule so that the | ||
# ti does not execute. | ||
if dep_context.wait_for_past_depends_before_skipping: | ||
past_depends_met = ti.xcom_pull( | ||
task_ids=ti.task_id, key=PAST_DEPENDS_MET, session=session, default=False | ||
) | ||
return | ||
if not past_depends_met: | ||
yield self._failing_status( | ||
reason="Task should be skipped but the past depends are not met" | ||
) | ||
return | ||
ti.set_state(TaskInstanceState.SKIPPED, session) | ||
yield self._failing_status( | ||
reason=f"Skipping because of previous XCom result from parent task {parent.task_id}" | ||
) | ||
return |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix short circuit operator in mapped tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uranusjr following up #43883 (comment) - it seems that these attributes were missing here, so that's why PythonOperator printed misleading logs