-
Notifications
You must be signed in to change notification settings - Fork 1
/
dwh-and-analytics.yaml
88 lines (77 loc) · 2.75 KB
/
dwh-and-analytics.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
id: dwh-and-analytics
namespace: tutorial
description: Data Warehouse and Analytics
tasks:
- id: dbt
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-demo
branch: main
- id: dbt_build
type: io.kestra.plugin.dbt.cli.DbtCLI
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/dbt-duckdb:latest
commands:
- dbt deps
- dbt build
profiles: |
jaffle_shop:
outputs:
dev:
type: duckdb
path: dbt.duckdb
extensions:
- parquet
fixed_retries: 1
threads: 16
timeout_seconds: 300
target: dev
- id: python
type: io.kestra.plugin.scripts.python.Script
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/duckdb:latest
script: |
import duckdb
import pandas as pd
conn = duckdb.connect(database='dbt.duckdb', read_only=False)
tables_query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main';"
tables = conn.execute(tables_query).fetchall()
# Export each table to CSV, excluding tables that start with 'raw' or
'stg'
for table_name in tables:
table_name = table_name[0]
# Skip tables with names starting with 'raw' or 'stg'
if not table_name.startswith('raw') and not table_name.startswith('stg'):
query = f"SELECT * FROM {table_name}"
df = conn.execute(query).fetchdf()
df.to_csv(f"{table_name}.csv", index=False)
conn.close()
extend:
title: Getting started with Kestra — a Data Warehouse and Analytics workflow example
description: >-
This flow is a simple example of a data warehouse and analytics use case. It
clones a dbt repository, builds the dbt project, and exports the data to CSV
files.
The flow has three tasks:
1. The first task clones a dbt repository.
2. The second task builds the dbt models and tests using DuckDB.
3. The third task exports the transformed data to CSV files.
tags:
- Getting Started
- Data
- dbt
- DuckDB
- Git
- Python
- Docker
ee: true
demo: true
meta_description: This flow is a simple example of a data warehouse and
analytics use case. It clones a dbt repository, builds the dbt project, and
exports the data to CSV files.