Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add traceback output for task in case when SIGTERM was sent during task execution #44880

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

VladaZakharova
Copy link
Contributor

This PR adds ability to output traceback when a task instance is killed externally and SIGTERM is sent to a task runner process.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@shahar1 shahar1 added the backport-to-v2-10-test Mark PR with this label to backport to v2-10-test branch label Dec 12, 2024
@ashb
Copy link
Member

ashb commented Dec 12, 2024

Can you explain why this is useful helpful? I imagine the stack trace is going to be large and mostly not relevent to when a task was SIGTERMd.

@VladaZakharova
Copy link
Contributor Author

Can you explain why this is useful helpful? I imagine the stack trace is going to be large and mostly not relevent to when a task was SIGTERMd.

I agree the traceback can be large in some cases, but the idea is to get the code line on which the task was killed. Ideally it will help to troubleshoot issues that we get

@shahar1 shahar1 self-requested a review December 13, 2024 12:29
@shahar1
Copy link
Contributor

shahar1 commented Dec 13, 2024

Can you explain why this is useful helpful? I imagine the stack trace is going to be large and mostly not relevent to when a task was SIGTERMd.

I agree the traceback can be large in some cases, but the idea is to get the code line on which the task was killed. Ideally it will help to troubleshoot issues that we get

Following Ash's comment - maybe it would be better to utilize a flag/configuration to enable it?

@potiuk
Copy link
Member

potiuk commented Dec 13, 2024

Following Ash's comment - maybe it would be better to utilize a flag/configuration to enable it?

Yeah. Configuration to enable it would be better. I think the stackrace where the task was killed is not too useful - really - I am not even sure if it will actually show the place where the process is. Signals are always delivered to the main thread (which is one limitation), and I am not even sure if the stacktrace in this case will be showing where the thread was "in" before.

Do you have some examples of such stack-traces generated with it that looks like "useful" @VladaZakharova ?

@eladkal eladkal added this to the Airflow 2.10.5 milestone Dec 13, 2024
@VladaZakharova
Copy link
Contributor Author

Following Ash's comment - maybe it would be better to utilize a flag/configuration to enable it?

Yeah. Configuration to enable it would be better. I think the stackrace where the task was killed is not too useful - really - I am not even sure if it will actually show the place where the process is. Signals are always delivered to the main thread (which is one limitation), and I am not even sure if the stacktrace in this case will be showing where the thread was "in" before.

Do you have some examples of such stack-traces generated with it that looks like "useful" @VladaZakharova ?

I think something like this can be useful:

from datetime import timedelta
import time

import airflow
from providers.src.airflow.providers.standard.operators.python import PythonOperator


with airflow.DAG(
    "trace_import_timeout",
    start_date=datetime(2022, 1, 1),
    schedule=None) as dag:
    def f():
        print("Sleeping")
        time.sleep(3660)


    for ind in range(2):
        PythonOperator(
            dag=dag,
            task_id=f"sleep_120_{ind}",
            python_callable=f,
        )

And the output in the logs will look like this:

[2024-12-12, 14:07:20 UTC] {taskinstance.py:2813} ERROR - Received SIGTERM. Terminating subprocesses.
[2024-12-12, 14:07:20 UTC] {taskinstance.py:2814} ERROR - Stacktrace: 
  File "/usr/local/bin/***", line 8, in <module>
    sys.exit(main())
  File "/opt/***/***/__main__.py", line 58, in main
    args.func(args)
  File "/opt/***/***/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/***/***/utils/cli.py", line 111, in wrapper
    return f(*args, **kwargs)
...
    return func(*args, **kwargs)
  File "/files/dags/core/example_logs_trace.py", line 14, in f
    time.sleep(3660)
  File "/opt/***/***/models/taskinstance.py", line 2814, in signal_handler
    self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack()))

Which shows what command it was executing when the SIGTERM happened.

Also we already have this output if the task failed due to timeout:

[2024-12-12, 13:55:41 UTC] {timeout.py:68} ERROR - Process timed out, PID: 1255
[2024-12-12, 13:55:41 UTC] {taskinstance.py:3041} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 743, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 714, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 269, in run
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/baseoperator.py", line 378, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/providers/src/airflow/providers/standard/operators/python.py", line 195, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/providers/src/airflow/providers/standard/operators/python.py", line 221, in execute_callable
    return runner.run(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 269, in run
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/files/dags/core/example_logs_trace.py", line 14, in f
    time.sleep(3660)
  File "/opt/airflow/airflow/utils/timeout.py", line 69, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 1255

Maybe it makes sense to make the output the same short as when we have a timeout error. We still can see in this example that it outputs the place where it failed

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems useful enough . @ashb do you have any doubts there, I see that it can indeed add value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-to-v2-10-test Mark PR with this label to backport to v2-10-test branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants