Skip to content

Commit

Permalink
Merge pull request #12 from shrutimantri/add-new-lines1
Browse files Browse the repository at this point in the history
feat: add new lines at appropriate places to blueprints - 2
  • Loading branch information
wrussell1999 authored Nov 8, 2024
2 parents 5f95f87 + b8044e8 commit 1d00a94
Show file tree
Hide file tree
Showing 53 changed files with 157 additions and 202 deletions.
4 changes: 0 additions & 4 deletions add-items-to-dynamodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,13 @@ extend:
description: >-
This flow adds an item to a DynamoDB table.
The `item` property can be either a map or a JSON string.
The `tableName` property must point to an already existing DynamoDB table.
The `region` property must be a valid AWS region.
It's recommended to set the `accessKeyId` and `secretKeyId` properties as
secrets.
This flow assumes AWS credentials stored as secrets `AWS_ACCESS_KEY_ID`,
`AWS_SECRET_ACCESS_KEY` and `AWS_DEFAULT_REGION`.
tags:
Expand Down
12 changes: 0 additions & 12 deletions advanced-scheduling.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,41 +40,31 @@ extend:
for the specified country. If it is, the flow continues to the next task. If
it is not, the task fails, blocking the execution of subsequent tasks.
The Pebble expression `"{{trigger.date ?? inputs.date}}"` will make sure
that the flow will use either the schedule date from the trigger or the date
provided on the `date` input at runtime. You can use the inputs to test the
logic.
Make sure adjust the Python script to match your desired country.
To add the Python script, go to the VS Code Editor in the Kestra UI and add
the script `schedule.py`:
```python
import sys
from datetime import datetime
from workalendar.europe import France # Import calendars for specific
countries
from workalendar.usa import UnitedStates # Example for another country
def is_business_day(date_str, country_calendar):
# Remove 'Z' from kestra's timezone-specific timestamp and parse the date string
date = datetime.fromisoformat(date_str.replace("Z", ""))
# Check if the date is a business day
return country_calendar.is_working_day(date)
def main():
if len(sys.argv) != 3:
print("Usage: script.py <date> <country_code>")
Expand Down Expand Up @@ -103,12 +93,10 @@ extend:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
```
This way, you can store and manage your custom scripts using Namespace Files
rather than pasting them inline in YAML.
tags:
Expand Down
1 change: 0 additions & 1 deletion airbyte-cloud-dbt-cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ extend:
This flow will sync data from multiple sources in parallel using Airbyte
Cloud. Then, it will run a dbt Cloud job.
It's recommended to configure the dbt Cloud and [Airbyte
API](https://portal.airbyte.com/apiKeys) tokens as secrets.
tags:
Expand Down
1 change: 0 additions & 1 deletion airbyte-cloud-dbt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ extend:
This flow runs Airbyte Cloud syncs in parallel and then runs dbt Core's CLI
commands.
It's recommended to configure the GCP service account and [Airbyte API
token](https://portal.airbyte.com/apiKeys), referenced in the
`pluginDefaults`, as a secret.
Expand Down
1 change: 0 additions & 1 deletion airbyte-cloud-sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ extend:
This flow will sync data using Airbyte Cloud. You can generate an API key in
the [Airbyte’s Cloud developer portal](https://portal.airbyte.com/apiKeys).
It's best to set the Airbyte Cloud API key as a secret, but you can also
paste it into the flow in plain text during testing.
tags:
Expand Down
1 change: 0 additions & 1 deletion airbyte-sync-parallel-with-dbt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ extend:
This flow executes several Airbyte syncs in parallel and then runs dbt
Core's CLI commands.
It's recommended to set GCP and Airbyte server credentials as secrets.
tags:
- Parallel
Expand Down
1 change: 0 additions & 1 deletion airbyte-sync-parallel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ extend:
description: >
This flow syncs data from multiple sources in parallel using Airbyte.
The Airbyte server credentials, referenced in the `pluginDefaults`, are
stored as secrets. The `pluginDefaults` flow property helps remove
boilerplate code, allowing to define common values such as `url`, `username`
Expand Down
1 change: 0 additions & 1 deletion airbyte-sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ extend:
description: >
This flow will sync data using Airbyte on schedule.
It's best to provide Airbyte server credentials (the `username` and
`password`) as secrets, but you can also paste them in plain text during
testing.
Expand Down
2 changes: 0 additions & 2 deletions allow-failure-demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ extend:
The `AllowFailure` task is useful if you don't want to block downstream
tasks when a specific task fails.
In this example, the task `fail_silently` will fail, but the flow will
continue to run and the task `print_to_console` will run.
Expand Down
2 changes: 0 additions & 2 deletions api-json-to-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,9 @@ extend:
**EXTRACT**: This workflow downloads data from an external API via HTTP GET
request and stores the result in Kestra's internal storage in ION format.
**TRANSFORM**: The extracted file is then serialized to JSON and transformed
row-by-row to add a new column.
**LOAD**: The final result is ingested in parallel to S3 and Postgres.
tags:
- S3
Expand Down
2 changes: 0 additions & 2 deletions api-python-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ extend:
This flow will download a file from a REST API, process it with Python and
SQL, and store the result in the internal storage.
- the `api` http Request task uses a public API — to interact with a private
API endpoint, check the task documentation for examples on how to
authenticate your request
Expand All @@ -53,7 +52,6 @@ extend:
- the DuckDB query task uses data from a previous task and outputs the query
result to internal storage.
The Outputs tab provides a well-formatted table with the query results.
tags:
- API
Expand Down
1 change: 0 additions & 1 deletion auditlogs-to-bigquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ extend:
audit logs from the Kafka topic, transforms the data, and loads it into
BigQuery.
The flow is triggered every day at 10 AM UTC. You can customize the trigger
by changing the cron expression, timezone and more. For more information
about cron expressions, visit the [following
Expand Down
8 changes: 3 additions & 5 deletions aws-batch-runner.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
id: aws-batch-runner
namespace: company.team

variables:
compute_environment_arn: arn:aws:batch:us-east-1:123456789:compute-environment/kestra
job_queue_arn: arn:aws:batch:us-east-1:123456789:job-queue/kestra
execution_role_arn: arn:aws:iam::123456789:role/ecsTaskExecutionRole
task_role_arn: arn:aws:iam::123456789:role/ecsTaskRole

tasks:
- id: send_data
type: io.kestra.plugin.scripts.python.Script
Expand Down Expand Up @@ -39,9 +41,9 @@ tasks:
except socket.error as e:
print("Unable to obtain IP address.")
if __name__ == '__main__':
print_environment_info()
extend:
title: Run a Python script on AWS ECS Fargate with AWS Batch
description: >-
Expand All @@ -53,16 +55,12 @@ extend:
In order to support `inputFiles`, `namespaceFiles`, and `outputFiles`, the
AWS Batch task runner currently relies on multi-containers ECS jobs and
creates three containers for each job:
- A before-container that uploads input files to S3.
- The main container that fetches input files into the `{{ workingDir }}`
directory and runs the task.
- An after-container that fetches output files using outputFiles to make
them available from the Kestra UI for download and preview.
Since we don't know the working directory of the container in advance, we
always need to explicitly define the working directory and output directory
when using the AWS Batch runner, e.g. use cat `{{ workingDir }}/myFile.txt`
Expand Down
16 changes: 4 additions & 12 deletions aws-batch-terraform-git.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tasks:
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/terraform-deployments
branch: main

- id: tf
type: io.kestra.plugin.terraform.cli.TerraformCLI
inputFiles:
Expand All @@ -41,6 +42,7 @@ tasks:
AWS_DEFAULT_REGION: "{{ inputs.region }}"
outputFiles:
- "*.txt"

- id: parse_tf_output
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/kestrapy:latest
Expand Down Expand Up @@ -77,6 +79,7 @@ tasks:
bucket: "{{ inputs.bucket }}"
commands:
- pip show kestra

- id: run_python_script
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
Expand All @@ -92,30 +95,25 @@ tasks:
bucket: "{{ inputs.bucket }}"
script: >
import platform
import socket
import sys
print("Hello from AWS Batch and kestra!")
def print_environment_info():
print(f"Host's network name: {platform.node()}")
print(f"Python version: {platform.python_version()}")
print(f"Platform information (instance type): {platform.platform()}")
print(f"OS/Arch: {sys.platform}/{platform.machine()}")
try:
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"Host IP Address: {ip_address}")
except socket.error as e:
print("Unable to obtain IP address.")
print_environment_info()
extend:
title: Run multiple Python scripts in parallel on AWS ECS Fargate with AWS Batch
description: >
Expand All @@ -124,24 +122,18 @@ extend:
Terraform resources to run script tasks on AWS ECS Fargate including the AWS
Batch compute environment, job queue, and ECS task roles.
The only prerequisites are AWS credentials and an S3 Bucket in the same
region in which you want to run AWS Batch jobs.
We assume that you have a default VPC in the region you are deploying to. If
you do not have a default VPC, you can create it using the following
command:
```
aws ec2 create-default-vpc --region us-east-1 # replace with your chosen AWS
region
```
Once the flow completes, you can download the Terraform output to see the
ARNs of the AWS Batch compute environment, job queue, and ECS task roles.
You can store these as `pluginDefaults` in your namespace to run all Python
Expand Down
6 changes: 5 additions & 1 deletion aws-event-bridge.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
id: aws-event-bridge
namespace: company.team

tasks:
- id: send_events
type: io.kestra.plugin.aws.eventbridge.PutEvents
Expand All @@ -20,23 +21,26 @@ tasks:
detail:
message: another event which could be a user sign-out event or a newsletter
subscription

- id: extract_json
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/json/app_events.json

- id: json_to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.extract_json.uri }}"
newLine: false

- id: send_events_json
type: io.kestra.plugin.aws.eventbridge.PutEvents
entries: "{{ outputs.json_to_ion.uri }}"

extend:
title: Send custom events from your application to AWS EventBridge
description: >-
This flow will send one or more events to the specified event bus on AWS
EventBridge.
The events can be sent as a JSON file or as a list of maps (dictionaries).
For simple use cases, the map syntax is the easiest to get started. However,
if your application already emits events in the same format as required by
Expand Down
7 changes: 5 additions & 2 deletions aws-kinesis-json.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
id: aws-kinesis-json
namespace: company.team

tasks:
- id: put_records_simple_map
type: io.kestra.plugin.aws.kinesis.PutRecords
Expand All @@ -9,17 +10,21 @@ tasks:
partitionKey: user1
- data: sign-out
partitionKey: user1

- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/resolve/main/json/user_events.json

- id: json_to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.extract.uri }}"
newLine: false

- id: put_records
type: io.kestra.plugin.aws.kinesis.PutRecords
streamName: kestra
records: "{{ outputs.json_to_ion.uri }}"

extend:
title: Send multiple records to AWS Kinesis Data Streams in a simple list of
maps or using a JSON API payload
Expand All @@ -31,12 +36,10 @@ extend:
format using the `JsonToIon` task. Then, you can pass it to the `PutRecords`
task.
The Outputs tab will display all records sent to AWS Kinesis Data Streams,
including the shard ID and sequence number. You can use these values to
retrieve the records from the stream using the `GetRecords` AWS API call.
Note that when sending the `data` payload, this must be a string value. If
you want to send a JSON object to your Kinesis Data Stream, wrap it into a
string, as shown in the [following
Expand Down
Loading

0 comments on commit 1d00a94

Please sign in to comment.