-
Notifications
You must be signed in to change notification settings - Fork 1
/
advanced-scheduling.yaml
113 lines (94 loc) · 3.6 KB
/
advanced-scheduling.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
id: advanced-scheduling
namespace: company.team
inputs:
- id: country
type: STRING
defaults: US
- id: date
type: DATETIME
required: false
defaults: 2023-12-24T14:00:00.000Z
tasks:
- id: check_if_business_date
type: io.kestra.plugin.scripts.python.Commands
warningOnStdErr: false
namespaceFiles:
enabled: true
commands:
- python schedule.py "{{trigger.date ?? inputs.date}}" {{inputs.country}}
beforeCommands:
- pip install workalendar
taskRunner:
type: io.kestra.plugin.core.runner.Process
- id: log
type: io.kestra.plugin.core.log.Log
message: business day - continuing the flow...
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 14 25 12 *
extend:
title: Run specific tasks only on business days for a specific country
description: >-
This flow will run one or more tasks only on business days for a specific
country. The Python script in this flow checks if the date is a business day
for the specified country. If it is, the flow continues to the next task. If
it is not, the task fails, blocking the execution of subsequent tasks.
The Pebble expression `"{{trigger.date ?? inputs.date}}"` will make sure
that the flow will use either the schedule date from the trigger or the date
provided on the `date` input at runtime. You can use the inputs to test the
logic.
Make sure adjust the Python script to match your desired country.
To add the Python script, go to the VS Code Editor in the Kestra UI and add
the script `schedule.py`:
```python
import sys
from datetime import datetime
from workalendar.europe import France # Import calendars for specific
countries
from workalendar.usa import UnitedStates # Example for another country
def is_business_day(date_str, country_calendar):
# Remove 'Z' from kestra's timezone-specific timestamp and parse the date string
date = datetime.fromisoformat(date_str.replace("Z", ""))
# Check if the date is a business day
return country_calendar.is_working_day(date)
def main():
if len(sys.argv) != 3:
print("Usage: script.py <date> <country_code>")
sys.exit(1)
date_str, country_code = sys.argv[1], sys.argv[2]
# Dictionary mapping country codes to calendar objects
calendars = {
"FR": France(),
"US": UnitedStates(),
}
country_calendar = calendars.get(country_code.upper())
if not country_calendar:
print(f"Calendar for '{country_code}' not supported or not found.")
sys.exit(1)
try:
if not is_business_day(date_str, country_calendar):
print(f"{date_str} is not a business day in {country_code}.")
sys.exit(1)
else:
print(f"{date_str} is a business day in {country_code}.")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
```
This way, you can store and manage your custom scripts using Namespace Files
rather than pasting them inline in YAML.
tags:
- Schedule
- Trigger
- Namespace Files
- Python
ee: false
demo: true
meta_description: "This flow will run one or more tasks only on business days
for a specific country. The Python script in this flow checks if the date is
a business day for the specified country. If it is, the flow continues to
the next task. If it is not, the task fails, blocking the execution of
subsequent tasks. "