-
Notifications
You must be signed in to change notification settings - Fork 1
/
api-json-to-postgres.yaml
100 lines (88 loc) · 3.16 KB
/
api-json-to-postgres.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
id: api-json-to-postgres
namespace: company.team
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: https://gorest.co.in/public/v2/users
- id: ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.download.uri }}"
newLine: false
- id: json
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.ion.uri }}"
- id: add_column
type: io.kestra.plugin.scripts.jython.FileTransform
from: "{{ outputs.json.uri }}"
script: |
from datetime import datetime
logger.info('row: {}', row)
row['inserted_at'] = datetime.utcnow()
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: postgres
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: final_csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.add_column.uri }}"
header: true
- id: create_table
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://host.docker.internal:5432/
username: postgres
password: qwerasdfyxcv1234
sql: |
CREATE TABLE IF NOT EXISTS public.raw_users
(
id int,
name VARCHAR,
email VARCHAR,
gender VARCHAR,
status VARCHAR,
inserted_at timestamp
);
- id: load_data
type: io.kestra.plugin.jdbc.postgresql.CopyIn
url: jdbc:postgresql://host.docker.internal:5432/
username: postgres
password: qwerasdfyxcv1234
format: CSV
from: "{{ outputs.final_csv.uri }}"
table: public.raw_users
header: true
- id: s3
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: final_json
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.add_column.uri }}"
- id: json_to_s3
type: io.kestra.plugin.aws.s3.Upload
from: "{{ outputs.final_json.uri }}"
key: users.json
bucket: kestraio
region: "{{ secret('AWS_DEFAULT_REGION') }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
extend:
title: Extract data, transform it, and load it in parallel to S3 and Postgres —
in less than 7 seconds!
description: >-
**EXTRACT**: This workflow downloads data from an external API via HTTP GET
request and stores the result in Kestra's internal storage in ION format.
**TRANSFORM**: The extracted file is then serialized to JSON and transformed
row-by-row to add a new column.
**LOAD**: The final result is ingested in parallel to S3 and Postgres.
tags:
- S3
- Postgres
- Ingest
- Outputs
- Parallel
ee: false
demo: false
meta_description: This workflow extracts data via HTTP GET, transforms it by
adding a column, and loads the result in parallel to S3 and Postgres in less
than 7 seconds!