Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MappedTasks don't short-circuit #43883

Open
1 of 2 tasks
Chais opened this issue Nov 11, 2024 · 8 comments · May be fixed by #44925
Open
1 of 2 tasks

MappedTasks don't short-circuit #43883

Chais opened this issue Nov 11, 2024 · 8 comments · May be fixed by #44925
Assignees
Labels
affected_version:2.10 Issues Reported for 2.10 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@Chais
Copy link

Chais commented Nov 11, 2024

Apache Airflow version

2.10.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

It seems like tasks created via .expand() do not inherit the downstream tasks of its parent/original. This becomes an issue when we expand a task.short_circuit because short-circuiting no longer works.

What you think should happen instead?

My understanding from the documentation is, that step_two.expand(do=step_one.expand(i=start())) is supposed to be equivalent to:

flowchart LR
start --> A1[step_one_0] --> B1[step_two_0]
start --> A2[step_one_1] --> B2[step_two_1]
start --> A3[step_one_2] --> B3[step_two_2]
start --> A4[…] --> B4[…]
start --> A9[step_one_9] --> B9[step_two_9]
Loading

How to reproduce

import random
from typing import List

import pendulum
from airflow.decorators import dag, task


@dag(
    "test_foo",
    schedule=None,
    start_date=pendulum.now(),
    render_template_as_native_obj=True,
    dag_display_name="Test random things",
)
def test_foo():
    @task.python
    def start() -> List[int]:
        return [i for i in range(10)]

    @task.short_circuit
    def step_one(i: int) -> bool:
        print(f"Hello from step {i}")
        return random.random() >= 0.5

    @task.python
    def step_two(do: bool):
        if not do:
            print("Should've been skipped.")
        print("Doing stuff")

    step_two.expand(do=step_one.expand(i=start()))


test_foo()

if __name__ == "__main__":
    test_foo().test()

Obviously approximately 5 of the 10 tasks are bound to return False and short-circuit. If they do the log will read something like this:

Hello from step 5
[2024-11-11 13:31:09,695] {python.py:240} INFO - Done. Returned value was: False
[2024-11-11 13:31:09,697] {python.py:309} INFO - Condition result is False
[2024-11-11 13:31:09,698] {python.py:316} INFO - No downstream tasks; nothing to do.

But checking the task_dict variable in the module, we see that step_one does have step_two set as downstream task and vice-versa, step_two has step_one set as upstream task.

__pydevd_ret_val_dict['factory'].task_dict['step_one'].downstream_task_ids
{'step_two'}
__pydevd_ret_val_dict['factory'].task_dict['step_two'].upstream_task_ids
{'step_one'}

Operating System

24.04.1 LTS (Noble Numbat)

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

I'm testing this on Standalone

Anything else?

I can get closer to the desired behaviour by using a list comprehension in the dependency, but that can't be done dynamically, I can't iterate PlainXComArgs and it also doesn't produce exactly the desired behaviour.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Chais Chais added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 11, 2024
@dosubot dosubot bot added area:dynamic-task-mapping AIP-42 kind:feature Feature Requests labels Nov 11, 2024
@Chais
Copy link
Author

Chais commented Nov 12, 2024

I was able to get the desired behaviour by creating a task group, pulling the return_value XCom from start and manually creating all the tasks in a loop, which I feel shouldn't be necessary:

from random import random
from typing import List

from airflow.decorators import dag, task, task_group
from airflow.models import XCom
from pendulum import now


@dag(
    "test_foo",
    schedule=None,
    start_date=now(),
    dag_display_name="Test random things",
)
def test_foo():
    @task.python
    def start() -> List[int]:
        return [i for i in range(1, 11)]

    @task_group()
    def tg():
        @task.short_circuit
        def step_one(i: int) -> bool:
            print(f"Hello from step {i}")
            return random() >= 0.5

        @task.python(trigger_rule="one_done")
        def step_two(do: bool):
            if not do:
                print("Should've been skipped.")
            print("Doing stuff")

        for i in XCom.get_one(key="return_value", task_id="start", run_id=XCom.run_id):
            step_two(step_one(i))

    start() >> tg()


test_foo()

if __name__ == "__main__":
    test_foo().test()

@shahar1 shahar1 removed the needs-triage label for new issues that we didn't triage yet label Nov 16, 2024
@eladkal eladkal added the affected_version:2.10 Issues Reported for 2.10 label Nov 16, 2024
@Chais
Copy link
Author

Chais commented Nov 20, 2024

I'm seeing this on 2.9.3, too.

@Paniraj2010
Copy link

@eladkal we have upgraded from airflow 2.9.3 to 2.10.3, we have similar issue. where we dont see logs on any task. We getting error. "No task logs found. Try the Event Log tab for more context."

May I know is this resolved?

@shahar1 shahar1 removed the kind:feature Feature Requests label Dec 7, 2024
@shahar1 shahar1 self-assigned this Dec 7, 2024
@shahar1
Copy link
Contributor

shahar1 commented Dec 7, 2024

Hey @Chais, I've managed to reproduce the issue using the code snippets that you provided.
The root cause of this issue is that short circuit isn't currently suitable for working with task-generated mapping - when the short circuit operator runs in that case, downstream tasks have not been expanded yet - so it just returns No downstream tasks; nothing to do. and moves on as if nothing happend:

if not self.downstream_task_ids:

In the second example that you provided, downstream are known before hand - so it works normally.
I'll try to work on a fix.

Edit: See my response below.

@uranusjr
Copy link
Member

uranusjr commented Dec 9, 2024

Shouldn’t downstream_task_ids still contain the unexpanded base task in this case? Something still seems off to me. And we should probably fix the operator in any case.

@shahar1
Copy link
Contributor

shahar1 commented Dec 12, 2024

Shouldn’t downstream_task_ids still contain the unexpanded base task in this case? Something still seems off to me. And we should probably fix the operator in any case.

After spending some more time, I've figured that you should be correct, and the problem seems to be actually here (see TODO comment just above it):

if isinstance(task, MappedOperator):

Until this point, task contains the downstream_task_ids - but due to the early return of mapped operators, it's not recorded in the downstream tasks.

The hacky solution seems to be pushing the downstream_task_ids for into a hidden xcom, so the mapped short-circuit will be able it to utilize it - but I'm also open for suggestions :)

@uranusjr
Copy link
Member

Why does not recording TaskMap affect this? The function only does one thing—pushing a row to the TaskMap table. This table is only used for task mapping.

@shahar1
Copy link
Contributor

shahar1 commented Dec 13, 2024

Why does not recording TaskMap affect this? The function only does one thing—pushing a row to the TaskMap table. This table is only used for task mapping.

You're correct, I think that now I understand better what's going on :)
I've managed to come up with a solution - feel free to review the PRs when you have the time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.10 Issues Reported for 2.10 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants