Skip to content

Commit

Permalink
Merge pull request #11 from shrutimantri/add_new_lines
Browse files Browse the repository at this point in the history
feat: add new lines at appropriate places to blueprints - 1
  • Loading branch information
wrussell1999 authored Nov 8, 2024
2 parents e1f8546 + 0e38e03 commit c65c788
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 12 deletions.
4 changes: 4 additions & 0 deletions add-items-to-dynamodb.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
id: add-items-to-dynamodb
namespace: company.team

tasks:
- id: first_item_as_map
type: io.kestra.plugin.aws.dynamodb.PutItem
item:
id: 1
flow: "{{ flow.id }}"
task: "{{ task.id }}"

- id: second_item_as_json
type: io.kestra.plugin.aws.dynamodb.PutItem
item: |
Expand All @@ -15,13 +17,15 @@ tasks:
"flow": "{{ flow.id }}",
"task": "{{ task.id }}"
}
pluginDefaults:
- type: io.kestra.plugin.aws.dynamodb.PutItem
values:
tableName: demo
region: "{{ secret('AWS_DEFAULT_REGION') }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"

extend:
title: Add a new item to a DynamoDB table
description: >-
Expand Down
8 changes: 7 additions & 1 deletion advanced-scheduling.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
id: advanced-scheduling
namespace: company.team

inputs:
- id: country
type: STRING
Expand All @@ -8,6 +9,7 @@ inputs:
type: DATETIME
required: false
defaults: 2023-12-24T14:00:00.000Z

tasks:
- id: check_if_business_date
type: io.kestra.plugin.scripts.python.Commands
Expand All @@ -18,14 +20,18 @@ tasks:
- python schedule.py "{{trigger.date ?? inputs.date}}" {{inputs.country}}
beforeCommands:
- pip install workalendar
runner: PROCESS
taskRunner:
type: io.kestra.plugin.core.runner.Process

- id: log
type: io.kestra.plugin.core.log.Log
message: business day - continuing the flow...

triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 14 25 12 *

extend:
title: Run specific tasks only on business days for a specific country
description: >-
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cloud-dbt-cloud.yaml
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
id: airbyte-cloud-dbt-cloud
namespace: company.team

tasks:
- id: data_ingestion
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: salesforce
type: io.kestra.plugin.airbyte.cloud.jobs.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab

- id: google_analytics
type: io.kestra.plugin.airbyte.cloud.jobs.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd

- id: facebook_ads
type: io.kestra.plugin.airbyte.cloud.jobs.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef

- id: dbt_cloud_job
type: io.kestra.plugin.dbt.cloud.TriggerRun
jobId: "396284"
accountId: "{{ secret('DBT_CLOUD_ACCOUNT_ID') }}"
token: "{{ secret('DBT_CLOUD_API_TOKEN') }}"
wait: true

pluginDefaults:
- type: io.kestra.plugin.airbyte.cloud.jobs.Sync
values:
token: "{{ secret('AIRBYTE_CLOUD_API_TOKEN') }}"

extend:
title: Trigger multiple Airbyte Cloud syncs, then run a dbt Cloud job
description: >-
Expand Down
10 changes: 9 additions & 1 deletion airbyte-cloud-dbt.yaml
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
id: airbyte-cloud-dbt
namespace: company.team

tasks:
- id: data_ingestion
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: salesforce
type: io.kestra.plugin.airbyte.cloud.jobs.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab

- id: google_analytics
type: io.kestra.plugin.airbyte.cloud.jobs.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd

- id: facebook_ads
type: io.kestra.plugin.airbyte.cloud.jobs.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef

- id: dbt
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-demo
branch: main

- id: dbt_build
type: io.kestra.plugin.dbt.cli.Build
runner: DOCKER
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
dbtPath: /usr/local/bin/dbt
dockerOptions:
image: ghcr.io/kestra-io/dbt-bigquery:latest
Expand All @@ -43,10 +49,12 @@ tasks:
timeout_seconds: 300
target: dev
sa.json: "{{ secret('GCP_CREDS') }}"

pluginDefaults:
- type: io.kestra.plugin.airbyte.cloud.jobs.Sync
values:
token: "{{ secret('AIRBYTE_CLOUD_API_TOKEN') }}"

extend:
title: Trigger multiple Airbyte Cloud syncs in parallel, then run a dbt job
description: >-
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cloud-sync.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
id: airbyte-cloud-sync
namespace: company.team

tasks:
- id: data_ingestion
type: io.kestra.plugin.airbyte.cloud.jobs.Sync
connectionId: ac127cf2-9ae3-4f9b-9dd0-e3a0944d1447
token: "{{ secret('AIRBYTE_CLOUD_API_TOKEN') }}"

triggers:
- id: every_minute
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/1 * * * *"

extend:
title: Trigger a single Airbyte Cloud sync on schedule
description: >-
Expand Down
7 changes: 7 additions & 0 deletions airbyte-sync-parallel-with-dbt.yaml
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
id: airbyte-sync-parallel-with-dbt
namespace: company.team

tasks:
- id: data_ingestion
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: salesforce
type: io.kestra.plugin.airbyte.connections.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab

- id: google_analytics
type: io.kestra.plugin.airbyte.connections.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd

- id: facebook_ads
type: io.kestra.plugin.airbyte.connections.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef

- id: dbt
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-demo
branch: main

- id: dbt_build
type: io.kestra.plugin.dbt.cli.Build
taskRunner:
Expand All @@ -43,12 +48,14 @@ tasks:
timeout_seconds: 300
target: dev
sa.json: "{{ secret('GCP_CREDS') }}"

pluginDefaults:
- type: io.kestra.plugin.airbyte.connections.Sync
values:
url: http://host.docker.internal:8000/
username: "{{ secret('AIRBYTE_USERNAME') }}"
password: "{{ secret('AIRBYTE_PASSWORD') }}"

extend:
title: Trigger multiple Airbyte syncs, then run a dbt job
description: >-
Expand Down
5 changes: 5 additions & 0 deletions airbyte-sync-parallel.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
id: airbyte-sync-parallel
namespace: company.team

tasks:
- id: data_ingestion
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: salesforce
type: io.kestra.plugin.airbyte.connections.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab

- id: google_analytics
type: io.kestra.plugin.airbyte.connections.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd

- id: facebook_ads
type: io.kestra.plugin.airbyte.connections.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef

pluginDefaults:
- type: io.kestra.plugin.airbyte.connections.Sync
values:
url: http://host.docker.internal:8000/
username: "{{ secret('AIRBYTE_USERNAME') }}"
password: "{{ secret('AIRBYTE_PASSWORD') }}"

extend:
title: Trigger multiple Airbyte syncs in parallel
description: >
Expand Down
3 changes: 3 additions & 0 deletions airbyte-sync.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
id: airbyte-sync
namespace: company.team

tasks:
- id: data_ingestion
type: io.kestra.plugin.airbyte.connections.Sync
connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab
url: http://host.docker.internal:8000/
username: "{{ secret('AIRBYTE_USERNAME') }}"
password: "{{ secret('AIRBYTE_PASSWORD') }}"

triggers:
- id: every_minute
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/1 * * * *"

extend:
title: Trigger a single Airbyte sync on schedule
description: >
Expand Down
5 changes: 5 additions & 0 deletions allow-failure-demo.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
id: allow-failure-demo
namespace: company.team

tasks:
- id: allow_failure
type: io.kestra.plugin.core.flow.AllowFailure
Expand All @@ -10,24 +11,28 @@ tasks:
type: io.kestra.plugin.core.runner.Process
commands:
- exit 1

- id: print_to_console
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- echo "this will run since previous failure was allowed ✅"

- id: fail
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- echo "failing and blocking downstream tasks ❌" && exit 1

- id: will_never_run
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- echo "this will never run ⛔️"

extend:
title: Allow failure of a group of tasks
description: >-
Expand Down
10 changes: 10 additions & 0 deletions api-json-to-postgres.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
id: api-json-to-postgres
namespace: company.team

tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: https://gorest.co.in/public/v2/users

- id: ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.download.uri }}"
newLine: false

- id: json
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.ion.uri }}"

- id: add_column
type: io.kestra.plugin.scripts.jython.FileTransform
from: "{{ outputs.json.uri }}"
script: |
from datetime import datetime
logger.info('row: {}', row)
row['inserted_at'] = datetime.utcnow()
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
Expand All @@ -28,6 +33,7 @@ tasks:
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.add_column.uri }}"
header: true

- id: create_table
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://host.docker.internal:5432/
Expand All @@ -43,6 +49,7 @@ tasks:
status VARCHAR,
inserted_at timestamp
);
- id: load_data
type: io.kestra.plugin.jdbc.postgresql.CopyIn
url: jdbc:postgresql://host.docker.internal:5432/
Expand All @@ -52,12 +59,14 @@ tasks:
from: "{{ outputs.final_csv.uri }}"
table: public.raw_users
header: true

- id: s3
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: final_json
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.add_column.uri }}"

- id: json_to_s3
type: io.kestra.plugin.aws.s3.Upload
from: "{{ outputs.final_json.uri }}"
Expand All @@ -66,6 +75,7 @@ tasks:
region: "{{ secret('AWS_DEFAULT_REGION') }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"

extend:
title: Extract data, transform it, and load it in parallel to S3 and Postgres —
in less than 7 seconds!
Expand Down
4 changes: 4 additions & 0 deletions api-python-sql.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
id: api-python-sql
namespace: company.team

tasks:
- id: api
type: io.kestra.plugin.core.http.Request
uri: https://dummyjson.com/products

- id: python
type: io.kestra.plugin.scripts.python.Script
taskRunner:
Expand All @@ -20,6 +22,7 @@ tasks:
df = pl.from_dicts(data)
df.glimpse()
df.select(["brand", "price"]).write_csv("products.csv")
- id: sql_query
type: io.kestra.plugin.jdbc.duckdb.Query
inputFiles:
Expand All @@ -30,6 +33,7 @@ tasks:
GROUP BY brand
ORDER BY avg_price DESC;
store: true

extend:
title: Extract data from a REST API, process it in Python with Polars in a
Docker container, then run DuckDB query and preview results as a table in
Expand Down
Loading

0 comments on commit c65c788

Please sign in to comment.