-
Notifications
You must be signed in to change notification settings - Fork 1
/
cassandra-to-bigquery.yaml
190 lines (158 loc) · 8.41 KB
/
cassandra-to-bigquery.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
id: cassandra-to-bigquery
namespace: company.team
tasks:
- id: query_cassandra
type: io.kestra.plugin.cassandra.Query
session:
endpoints:
- hostname: localhost
port: 9042
localDatacenter: datacenter1
cql: |
SELECT salary_id, work_year, experience_level, employment_type,
job_title, salary, salary_currency, salary_in_usd, employee_residence,
remote_ratio, company_location, company_size
FROM test.salary
store: true
- id: write_to_csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.query_cassandra.uri }}"
- id: load_bigquery
type: io.kestra.plugin.gcp.bigquery.Load
from: "{{ outputs.write_to_csv.uri }}"
destinationTable: my_project.my_dataset.my_table
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: my_project
format: CSV
csvOptions:
fieldDelimiter: ","
skipLeadingRows: 1
extend:
title: Extract data from Cassandra into a CSV file, and load it to BigQuery
description: >-
This flow extracts data from a Cassandra table, writes it to a CSV file, and
then loads the CSV data into BigQuery. It uses data from [the following
public
dataset](https://gist.githubusercontent.com/Ben8t/f182c57f4f71f350a54c65501d30687e/raw/940654a8ef6010560a44ad4ff1d7b24c708ebad4/salary-data.csv).
The GCP credentials are extracted from the environment variable
`GCP_SERVICE_ACCOUNT_JSON.` You need to create the table in BigQuery prior
to executing this flow, or you can choose to create it via this flow by
adding the schema to the BigQuery task.
You can set up Cassandra locally with Docker using: `docker run -p 9042:9042
cassandra`.
- Use `docker ps` to get the container ID where Cassandra is running.
- Then, `cqlsh` into Cassandra using `docker exec -it <contianer_id> cqlsh`.
- Finally, run the following commands to create the table in Cassandra:
```sql
CREATE KEYSPACE test;
USE test;
CREATE TABLE salary(
salary_id int primary key,
work_year int,
experience_level text,
employment_type text,
job_title text,
salary int,
salary_currency text,
salary_in_usd int,
employee_residence text,
remote_ratio int,
company_location text,
company_size text
);
```
Then, either use a COPY command: `COPY salary (salary_id, work_year,
experience_level, employment_type, job_title, salary, salary_currency,
salary_in_usd, employee_residence, remote_ratio, company_location,
company_size) FROM '/yourfile.csv' WITH HEADER = TRUE;`
Or run a few INSERT commands:
```sql
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (1, 2023,'EN','FT','Data
Analyst',75000,'USD',75000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (2, 2023,'EN','FT','Data
Analyst',60000,'USD',60000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (3, 2023,'MI','FT','Analytics
Engineer',185700,'USD',185700,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (4, 2023,'MI','FT','Analytics
Engineer',165000,'USD',165000,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (5, 2023,'SE','FT','Data
Engineer',160000,'USD',160000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (6, 2023,'SE','FT','Data
Engineer',130000,'USD',130000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (7, 2023,'SE','FT','Data
Analyst',169000,'USD',169000,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (8, 2023,'SE','FT','Data
Analyst',110600,'USD',110600,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (9, 2023,'SE','FT','Data Operations
Engineer',193000,'USD',193000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (10, 2023,'SE','FT','Data Operations
Engineer',136850,'USD',136850,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (11, 2023,'SE','FT','Machine Learning
Engineer',139500,'USD',139500,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (12, 2023,'SE','FT','Machine Learning
Engineer',109400,'USD',109400,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (13, 2023,'SE','FT','Data
Engineer',276000,'USD',276000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (14, 2023,'SE','FT','Data
Engineer',178500,'USD',178500,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (15, 2023,'MI','FT','Data
Scientist',55000,'EUR',59188,'ES',0,'ES','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (16, 2023,'MI','FT','Data
Scientist',45000,'EUR',48426,'ES',0,'ES','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (17, 2023,'MI','FT','Data
Engineer',70000,'EUR',75330,'SI',100,'SI','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (18, 2023,'MI','FT','Data
Engineer',45000,'EUR',48426,'SI',100,'SI','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (19, 2023,'SE','FT','Machine Learning
Engineer',161000,'GBP',195940,'GB',0,'GB','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (20, 2023,'SE','FT','Machine Learning
Engineer',83300,'GBP',101378,'GB',0,'GB','M');
```
tags:
- Ingest
- BigQuery
- SQL
ee: false
demo: false
meta_description: "This flow extracts data from a Cassandra table, writes it to
a CSV file, and then loads the CSV data into BigQuery. "