From ca6cc1de0a9ee238a529e47cac8d5d5517e04991 Mon Sep 17 00:00:00 2001 From: Shruti Mantri Date: Fri, 8 Nov 2024 17:38:33 +0530 Subject: [PATCH 1/3] feat: add new lines at appropriate places to blueprints - 1 --- add-items-to-dynamodb.yaml | 4 ++++ advanced-scheduling.yaml | 5 +++++ airbyte-cloud-dbt-cloud.yaml | 6 ++++++ airbyte-cloud-dbt.yaml | 7 +++++++ airbyte-cloud-sync.yaml | 3 +++ airbyte-sync-parallel-with-dbt.yaml | 7 +++++++ airbyte-sync-parallel.yaml | 5 +++++ airbyte-sync.yaml | 3 +++ allow-failure-demo.yaml | 5 +++++ api-json-to-postgres.yaml | 10 ++++++++++ api-python-sql.yaml | 4 ++++ api-to-s3.yaml | 4 ++++ auditlogs-to-bigquery.yaml | 6 ++++++ autoscraper.yaml | 15 +++++---------- 14 files changed, 74 insertions(+), 10 deletions(-) diff --git a/add-items-to-dynamodb.yaml b/add-items-to-dynamodb.yaml index a1b927e..63aacf1 100644 --- a/add-items-to-dynamodb.yaml +++ b/add-items-to-dynamodb.yaml @@ -1,5 +1,6 @@ id: add-items-to-dynamodb namespace: company.team + tasks: - id: first_item_as_map type: io.kestra.plugin.aws.dynamodb.PutItem @@ -7,6 +8,7 @@ tasks: id: 1 flow: "{{ flow.id }}" task: "{{ task.id }}" + - id: second_item_as_json type: io.kestra.plugin.aws.dynamodb.PutItem item: | @@ -15,6 +17,7 @@ tasks: "flow": "{{ flow.id }}", "task": "{{ task.id }}" } + pluginDefaults: - type: io.kestra.plugin.aws.dynamodb.PutItem values: @@ -22,6 +25,7 @@ pluginDefaults: region: "{{ secret('AWS_DEFAULT_REGION') }}" accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}" + extend: title: Add a new item to a DynamoDB table description: >- diff --git a/advanced-scheduling.yaml b/advanced-scheduling.yaml index 6a73cc7..bc365d5 100644 --- a/advanced-scheduling.yaml +++ b/advanced-scheduling.yaml @@ -1,5 +1,6 @@ id: advanced-scheduling namespace: company.team + inputs: - id: country type: STRING @@ -8,6 +9,7 @@ inputs: type: DATETIME required: false defaults: 2023-12-24T14:00:00.000Z + tasks: - id: check_if_business_date type: io.kestra.plugin.scripts.python.Commands @@ -19,13 +21,16 @@ tasks: beforeCommands: - pip install workalendar runner: PROCESS + - id: log type: io.kestra.plugin.core.log.Log message: business day - continuing the flow... + triggers: - id: schedule type: io.kestra.plugin.core.trigger.Schedule cron: 0 14 25 12 * + extend: title: Run specific tasks only on business days for a specific country description: >- diff --git a/airbyte-cloud-dbt-cloud.yaml b/airbyte-cloud-dbt-cloud.yaml index f135a91..7351b28 100644 --- a/airbyte-cloud-dbt-cloud.yaml +++ b/airbyte-cloud-dbt-cloud.yaml @@ -1,5 +1,6 @@ id: airbyte-cloud-dbt-cloud namespace: company.team + tasks: - id: data_ingestion type: io.kestra.plugin.core.flow.Parallel @@ -7,22 +8,27 @@ tasks: - id: salesforce type: io.kestra.plugin.airbyte.cloud.jobs.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab + - id: google_analytics type: io.kestra.plugin.airbyte.cloud.jobs.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd + - id: facebook_ads type: io.kestra.plugin.airbyte.cloud.jobs.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef + - id: dbt_cloud_job type: io.kestra.plugin.dbt.cloud.TriggerRun jobId: "396284" accountId: "{{ secret('DBT_CLOUD_ACCOUNT_ID') }}" token: "{{ secret('DBT_CLOUD_API_TOKEN') }}" wait: true + pluginDefaults: - type: io.kestra.plugin.airbyte.cloud.jobs.Sync values: token: "{{ secret('AIRBYTE_CLOUD_API_TOKEN') }}" + extend: title: Trigger multiple Airbyte Cloud syncs, then run a dbt Cloud job description: >- diff --git a/airbyte-cloud-dbt.yaml b/airbyte-cloud-dbt.yaml index 0a7627f..03e35cb 100644 --- a/airbyte-cloud-dbt.yaml +++ b/airbyte-cloud-dbt.yaml @@ -1,5 +1,6 @@ id: airbyte-cloud-dbt namespace: company.team + tasks: - id: data_ingestion type: io.kestra.plugin.core.flow.Parallel @@ -7,12 +8,15 @@ tasks: - id: salesforce type: io.kestra.plugin.airbyte.cloud.jobs.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab + - id: google_analytics type: io.kestra.plugin.airbyte.cloud.jobs.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd + - id: facebook_ads type: io.kestra.plugin.airbyte.cloud.jobs.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef + - id: dbt type: io.kestra.plugin.core.flow.WorkingDirectory tasks: @@ -20,6 +24,7 @@ tasks: type: io.kestra.plugin.git.Clone url: https://github.com/kestra-io/dbt-demo branch: main + - id: dbt_build type: io.kestra.plugin.dbt.cli.Build runner: DOCKER @@ -43,10 +48,12 @@ tasks: timeout_seconds: 300 target: dev sa.json: "{{ secret('GCP_CREDS') }}" + pluginDefaults: - type: io.kestra.plugin.airbyte.cloud.jobs.Sync values: token: "{{ secret('AIRBYTE_CLOUD_API_TOKEN') }}" + extend: title: Trigger multiple Airbyte Cloud syncs in parallel, then run a dbt job description: >- diff --git a/airbyte-cloud-sync.yaml b/airbyte-cloud-sync.yaml index 33c4382..f369845 100644 --- a/airbyte-cloud-sync.yaml +++ b/airbyte-cloud-sync.yaml @@ -1,14 +1,17 @@ id: airbyte-cloud-sync namespace: company.team + tasks: - id: data_ingestion type: io.kestra.plugin.airbyte.cloud.jobs.Sync connectionId: ac127cf2-9ae3-4f9b-9dd0-e3a0944d1447 token: "{{ secret('AIRBYTE_CLOUD_API_TOKEN') }}" + triggers: - id: every_minute type: io.kestra.plugin.core.trigger.Schedule cron: "*/1 * * * *" + extend: title: Trigger a single Airbyte Cloud sync on schedule description: >- diff --git a/airbyte-sync-parallel-with-dbt.yaml b/airbyte-sync-parallel-with-dbt.yaml index 693c45a..fe68ff8 100644 --- a/airbyte-sync-parallel-with-dbt.yaml +++ b/airbyte-sync-parallel-with-dbt.yaml @@ -1,5 +1,6 @@ id: airbyte-sync-parallel-with-dbt namespace: company.team + tasks: - id: data_ingestion type: io.kestra.plugin.core.flow.Parallel @@ -7,12 +8,15 @@ tasks: - id: salesforce type: io.kestra.plugin.airbyte.connections.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab + - id: google_analytics type: io.kestra.plugin.airbyte.connections.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd + - id: facebook_ads type: io.kestra.plugin.airbyte.connections.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef + - id: dbt type: io.kestra.plugin.core.flow.WorkingDirectory tasks: @@ -20,6 +24,7 @@ tasks: type: io.kestra.plugin.git.Clone url: https://github.com/kestra-io/dbt-demo branch: main + - id: dbt_build type: io.kestra.plugin.dbt.cli.Build taskRunner: @@ -43,12 +48,14 @@ tasks: timeout_seconds: 300 target: dev sa.json: "{{ secret('GCP_CREDS') }}" + pluginDefaults: - type: io.kestra.plugin.airbyte.connections.Sync values: url: http://host.docker.internal:8000/ username: "{{ secret('AIRBYTE_USERNAME') }}" password: "{{ secret('AIRBYTE_PASSWORD') }}" + extend: title: Trigger multiple Airbyte syncs, then run a dbt job description: >- diff --git a/airbyte-sync-parallel.yaml b/airbyte-sync-parallel.yaml index 3614ab4..d2a9229 100644 --- a/airbyte-sync-parallel.yaml +++ b/airbyte-sync-parallel.yaml @@ -1,5 +1,6 @@ id: airbyte-sync-parallel namespace: company.team + tasks: - id: data_ingestion type: io.kestra.plugin.core.flow.Parallel @@ -7,18 +8,22 @@ tasks: - id: salesforce type: io.kestra.plugin.airbyte.connections.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ab + - id: google_analytics type: io.kestra.plugin.airbyte.connections.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd + - id: facebook_ads type: io.kestra.plugin.airbyte.connections.Sync connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12ef + pluginDefaults: - type: io.kestra.plugin.airbyte.connections.Sync values: url: http://host.docker.internal:8000/ username: "{{ secret('AIRBYTE_USERNAME') }}" password: "{{ secret('AIRBYTE_PASSWORD') }}" + extend: title: Trigger multiple Airbyte syncs in parallel description: > diff --git a/airbyte-sync.yaml b/airbyte-sync.yaml index 2dc37ff..e8d1a5a 100644 --- a/airbyte-sync.yaml +++ b/airbyte-sync.yaml @@ -1,5 +1,6 @@ id: airbyte-sync namespace: company.team + tasks: - id: data_ingestion type: io.kestra.plugin.airbyte.connections.Sync @@ -7,10 +8,12 @@ tasks: url: http://host.docker.internal:8000/ username: "{{ secret('AIRBYTE_USERNAME') }}" password: "{{ secret('AIRBYTE_PASSWORD') }}" + triggers: - id: every_minute type: io.kestra.plugin.core.trigger.Schedule cron: "*/1 * * * *" + extend: title: Trigger a single Airbyte sync on schedule description: > diff --git a/allow-failure-demo.yaml b/allow-failure-demo.yaml index 383341d..60ae70b 100644 --- a/allow-failure-demo.yaml +++ b/allow-failure-demo.yaml @@ -1,5 +1,6 @@ id: allow-failure-demo namespace: company.team + tasks: - id: allow_failure type: io.kestra.plugin.core.flow.AllowFailure @@ -10,24 +11,28 @@ tasks: type: io.kestra.plugin.core.runner.Process commands: - exit 1 + - id: print_to_console type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - echo "this will run since previous failure was allowed ✅" + - id: fail type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - echo "failing and blocking downstream tasks ❌" && exit 1 + - id: will_never_run type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - echo "this will never run ⛔️" + extend: title: Allow failure of a group of tasks description: >- diff --git a/api-json-to-postgres.yaml b/api-json-to-postgres.yaml index 14f93fc..1a09a84 100644 --- a/api-json-to-postgres.yaml +++ b/api-json-to-postgres.yaml @@ -1,16 +1,20 @@ id: api-json-to-postgres namespace: company.team + tasks: - id: download type: io.kestra.plugin.core.http.Download uri: https://gorest.co.in/public/v2/users + - id: ion type: io.kestra.plugin.serdes.json.JsonToIon from: "{{ outputs.download.uri }}" newLine: false + - id: json type: io.kestra.plugin.serdes.json.IonToJson from: "{{ outputs.ion.uri }}" + - id: add_column type: io.kestra.plugin.scripts.jython.FileTransform from: "{{ outputs.json.uri }}" @@ -18,6 +22,7 @@ tasks: from datetime import datetime logger.info('row: {}', row) row['inserted_at'] = datetime.utcnow() + - id: parallel type: io.kestra.plugin.core.flow.Parallel tasks: @@ -28,6 +33,7 @@ tasks: type: io.kestra.plugin.serdes.csv.IonToCsv from: "{{ outputs.add_column.uri }}" header: true + - id: create_table type: io.kestra.plugin.jdbc.postgresql.Query url: jdbc:postgresql://host.docker.internal:5432/ @@ -43,6 +49,7 @@ tasks: status VARCHAR, inserted_at timestamp ); + - id: load_data type: io.kestra.plugin.jdbc.postgresql.CopyIn url: jdbc:postgresql://host.docker.internal:5432/ @@ -52,12 +59,14 @@ tasks: from: "{{ outputs.final_csv.uri }}" table: public.raw_users header: true + - id: s3 type: io.kestra.plugin.core.flow.Sequential tasks: - id: final_json type: io.kestra.plugin.serdes.json.IonToJson from: "{{ outputs.add_column.uri }}" + - id: json_to_s3 type: io.kestra.plugin.aws.s3.Upload from: "{{ outputs.final_json.uri }}" @@ -66,6 +75,7 @@ tasks: region: "{{ secret('AWS_DEFAULT_REGION') }}" accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}" + extend: title: Extract data, transform it, and load it in parallel to S3 and Postgres — in less than 7 seconds! diff --git a/api-python-sql.yaml b/api-python-sql.yaml index da11708..f337813 100644 --- a/api-python-sql.yaml +++ b/api-python-sql.yaml @@ -1,9 +1,11 @@ id: api-python-sql namespace: company.team + tasks: - id: api type: io.kestra.plugin.core.http.Request uri: https://dummyjson.com/products + - id: python type: io.kestra.plugin.scripts.python.Script taskRunner: @@ -20,6 +22,7 @@ tasks: df = pl.from_dicts(data) df.glimpse() df.select(["brand", "price"]).write_csv("products.csv") + - id: sql_query type: io.kestra.plugin.jdbc.duckdb.Query inputFiles: @@ -30,6 +33,7 @@ tasks: GROUP BY brand ORDER BY avg_price DESC; store: true + extend: title: Extract data from a REST API, process it in Python with Polars in a Docker container, then run DuckDB query and preview results as a table in diff --git a/api-to-s3.yaml b/api-to-s3.yaml index ec595f2..72517c2 100644 --- a/api-to-s3.yaml +++ b/api-to-s3.yaml @@ -1,10 +1,12 @@ id: api-to-s3 namespace: company.team + tasks: - id: get_pokemon type: io.kestra.plugin.core.http.Download method: GET uri: https://pokeapi.co/api/v2/pokemon/psyduck + - id: upload type: io.kestra.plugin.aws.s3.Upload bucket: kestraio @@ -13,11 +15,13 @@ tasks: accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}" region: "{{ secret('AWS_DEFAULT_REGION') }}" + triggers: - id: every_friday_afternoon type: io.kestra.plugin.core.trigger.Schedule timezone: Europe/Berlin cron: 0 17 * * FRI + extend: title: Extract data from an API and load it to S3 on schedule (every Friday afternoon) diff --git a/auditlogs-to-bigquery.yaml b/auditlogs-to-bigquery.yaml index 9de02d0..908a6e5 100644 --- a/auditlogs-to-bigquery.yaml +++ b/auditlogs-to-bigquery.yaml @@ -1,5 +1,6 @@ id: auditlogs-to-bigquery namespace: company.team + tasks: - id: consume type: io.kestra.plugin.kafka.Consume @@ -9,6 +10,7 @@ tasks: topic: kestra_auditlogs valueDeserializer: JSON maxRecords: 500 + - id: transform type: io.kestra.plugin.scripts.nashorn.FileTransform from: "{{ outputs.consume.uri }}" @@ -31,6 +33,7 @@ tasks: row['detail_namespace'] = value['detail']['namespace'] row['detail_flowId'] = value['detail']['flowId'] row['detail_executionId'] = value['detail']['executionId'] + - id: avro type: io.kestra.plugin.serdes.avro.IonToAvro from: "{{ outputs.transform.uri }}" @@ -56,6 +59,7 @@ tasks: { "name": "detail_executionId", "type": ["null", "string"] } ] } + - id: load type: io.kestra.plugin.gcp.bigquery.Load avroOptions: @@ -66,10 +70,12 @@ tasks: writeDisposition: WRITE_TRUNCATE serviceAccount: "{{ secret('GCP_CREDS') }}" projectId: your_gcp_project + triggers: - id: schedule type: io.kestra.plugin.core.trigger.Schedule cron: 0 10 * * * + extend: title: Stream kestra audit logs from a Kafka topic to BigQuery for analytics and troubleshooting diff --git a/autoscraper.yaml b/autoscraper.yaml index f4eb26a..c677838 100644 --- a/autoscraper.yaml +++ b/autoscraper.yaml @@ -1,5 +1,6 @@ id: autoscraper namespace: company.team + tasks: - id: scrape type: io.kestra.plugin.scripts.python.Script @@ -8,37 +9,31 @@ tasks: warningOnStdErr: false script: > from autoscraper import AutoScraper - from kestra import Kestra - - url = - "https://stackoverflow.com/questions/2081586/web-scraping-with-python" - + url = "https://stackoverflow.com/questions/2081586/web-scraping-with-python" # You can also put urls here to retrieve urls. - wanted_list = ["What are metaclasses in Python?"] - scraper = AutoScraper() - result = scraper.build(url, wanted_list) - # get related topics of any stackoverflow page: - related = scraper.get_result_similar( "https://stackoverflow.com/questions/606191/convert-bytes-to-a-string" ) Kestra.outputs({"data": result, "related": related}) + - id: use_output_data type: io.kestra.plugin.core.debug.Return format: "{{ outputs.scrape.vars.data }}" + - id: use_output_related type: io.kestra.plugin.core.debug.Return format: "{{ outputs.scrape.vars.related }}" + extend: title: Scrape StackOverflow using AutoScraper in Python description: This flow shows how to scrape a web page using AutoScraper in From 5bcf5e5cfe040b1890a8601c6b77896a0ba99f18 Mon Sep 17 00:00:00 2001 From: Will Russell Date: Fri, 8 Nov 2024 12:13:43 +0000 Subject: [PATCH 2/3] Update airbyte-cloud-dbt.yaml --- airbyte-cloud-dbt.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cloud-dbt.yaml b/airbyte-cloud-dbt.yaml index 03e35cb..0e54c5e 100644 --- a/airbyte-cloud-dbt.yaml +++ b/airbyte-cloud-dbt.yaml @@ -27,7 +27,8 @@ tasks: - id: dbt_build type: io.kestra.plugin.dbt.cli.Build - runner: DOCKER + taskRunner: + type: io.kestra.plugin.scripts.runner.docker.Docker dbtPath: /usr/local/bin/dbt dockerOptions: image: ghcr.io/kestra-io/dbt-bigquery:latest From 0e38e0302058c4a05cb8784adc95afb67d21f7cb Mon Sep 17 00:00:00 2001 From: Will Russell Date: Fri, 8 Nov 2024 12:13:47 +0000 Subject: [PATCH 3/3] Update advanced-scheduling.yaml --- advanced-scheduling.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/advanced-scheduling.yaml b/advanced-scheduling.yaml index bc365d5..2c670d8 100644 --- a/advanced-scheduling.yaml +++ b/advanced-scheduling.yaml @@ -20,7 +20,8 @@ tasks: - python schedule.py "{{trigger.date ?? inputs.date}}" {{inputs.country}} beforeCommands: - pip install workalendar - runner: PROCESS + taskRunner: + type: io.kestra.plugin.core.runner.Process - id: log type: io.kestra.plugin.core.log.Log