Local:
-
curl
-
jq
-
Docker
-
kafkacat
(or use the docker images and modify the scripts accordingly)
-
KSQL CLI:
docker exec -it ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/info) ; echo -e $(date) " KSQL server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksql-server:8088'
-
MySQL CLI:
docker exec -it mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'
-
Option 1 : load sample data from local
./load_sample_data.sh
or
kafkacat -b localhost:9092 -t data_mqtt -K: -P -T -l ./data/dummy_data.kcat kafkacat -b localhost:9092 -t data_mqtt -K: -P -T -l ./data/tmp.kcat
-
Option 2 : live pull from CCloud
-
Replicator
./create_replicator_source.sh
-
kafkacat hacky way
#!/bin/bash source .env kafkacat -b $CCLOUD_BROKER_HOST \ -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \ -X ssl.ca.location=/usr/local/etc/openssl/cert.pem -X api.version.request=true \ -X auto.offset.reset=earliest \ -G copy_to_local_01 data_mqtt -K: | \ kafkacat -b localhost:9092,localhost:19092,localhost:29092 \ -t data_mqtt \ -K: -P
-
-
Option 3 : stream from MQTT source
./create_mqtt_source_local.sh
Check there’s data on the topic:
kafkacat -b localhost:9092 -t data_mqtt -o beginning -C
Check out the topics on the cluster
SHOW TOPICS;
There’s a stream of MQTT data; let’s take a look at it!
PRINT 'data_mqtt';
Let’s take this stream of JSON messages and declare the schema so that we can use it in KSQL. The schema comes from the owner of the data: https://owntracks.org/booklet/tech/json/#_typelocation
CREATE STREAM MQTT_RAW
(TID VARCHAR, BATT INTEGER, LON DOUBLE, LAT DOUBLE,
TST BIGINT, ALT INTEGER, COG INTEGER, VEL INTEGER,
P DOUBLE, BS INTEGER, CONN VARCHAR, ACC INTEGER,
T VARCHAR, VAC INTEGER, INREGIONS VARCHAR, TYPE VARCHAR)
WITH (KAFKA_TOPIC = 'data_mqtt', VALUE_FORMAT='JSON');
SELECT TIMESTAMPTOSTRING(TST*1000, 'MMM-dd HH:mm:ss','Europe/London') as TS,
TID,
BATT,
BS
FROM MQTT_RAW;
SELECT TIMESTAMPTOSTRING(TST*1000, 'MMM-dd HH:mm:ss','Europe/London') as TS,
TID,
BATT,
BS,
GEO_DISTANCE(LAT,LON,51.4965005,-0.2106881,'KM') AS DISTANCE_FROM_VENUE
FROM MQTT_RAW;
Now let’s do something more than just write this to the screen. Let’s write it to a new Kafka topic!
CREATE STREAM PHONE_DATA
WITH (VALUE_FORMAT='AVRO') AS
SELECT SPLIT(ROWKEY, '/')[2] AS WHO
, TST * 1000 AS EVENT_TIME_EPOCH_MS_TS
, ROWTIME AS SYSTEM_TIME_MS_TS
, TIMESTAMPTOSTRING(TST*1000,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TIME
, CASE WHEN LAT IS NULL OR LON IS NULL THEN CAST(NULL AS VARCHAR)
ELSE CAST(LAT AS VARCHAR) +','+CAST(LON AS VARCHAR)
END AS LOCATION
, ACC AS LOCATION_ACCURACY_M
, ALT AS ALTITUDE_M
, BATT AS BATTERY_PCT
, CASE WHEN BS=0 THEN 'Unknown'
WHEN BS=1 THEN 'Unplugged'
WHEN BS=2 THEN 'Charging'
WHEN BS=3 THEN 'Full'
ELSE '[unknown]'
END AS BATTERY_STATUS
, COG AS COURSE_OVER_GROUN
, CASE WHEN T='p' THEN 'ping issued randomly by background task'
WHEN T='c' THEN 'circular region enter/leave event'
WHEN T='b' THEN 'beacon region enter/leave event'
WHEN T='r' THEN 'response to a reportLocation cmd message'
WHEN T='u' THEN 'manual publish requested by the user'
WHEN T='t' THEN 'timer based publish in move'
WHEN T='v' THEN 'updated by Settings/Privacy/Locations Services/System Services/Frequent Locations monitoring'
ELSE '[unknown]'
END AS REPORT_TRIGGER
, TID AS TRACKER_ID
, VAC AS VERTICAL_ACCURACY_M
, VEL AS VELOCITY_KMH
, P AS PRESSURE_KPA
, CASE WHEN CONN='w' THEN 'WiFi'
WHEN CONN='o' THEN 'Offline'
WHEN CONN='m' THEN 'Mobile'
ELSE '[unknown]'
END AS CONNECTIVITY_STATUS
, INREGIONS AS REGIONS
, LAT, LON,
GEO_DISTANCE(LAT,LON,51.4965005,-0.2106881,'KM') AS DISTANCE_FROM_VENUE
FROM MQTT_RAW;
Show it worked:
SELECT WHO,
EVENT_TIME,
TRACKER_ID,
BATTERY_PCT,
BATTERY_STATUS,
CONNECTIVITY_STATUS
FROM PHONE_DATA;
Since this is just a Kafka topic we can use and consume it just like any other. In this example, streaming the data to Elasticsearch.
CREATE SINK CONNECTOR sink_elastic_phone_data_00 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = '${file:/data/credentials.properties:ELASTIC_URL}',
'connection.username' = '${file:/data/credentials.properties:ELASTIC_USERNAME}',
'connection.password' = '${file:/data/credentials.properties:ELASTIC_PASSWORD}',
'type.name' = '',
'behavior.on.malformed.documents' = 'warn',
'errors.tolerance' = 'all',
'errors.log.enable' = 'true',
'errors.log.include.messages' = 'true',
'topics.regex' = 'PHONE_.*',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter'
);
-
Check it’s running
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort
sink | sink-elastic-phone_data-00 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
-
Set up Kibana
echo -e "\n--\n+> Opt out of Kibana telemetry" curl 'http://localhost:5601/api/telemetry/v2/optIn' -H 'kbn-xsrf: nevergonnagiveyouup' -H 'content-type: application/json' -H 'accept: application/json' --data-binary '{"enabled":false}' --compressed echo -e "Import objects" curl 'http://localhost:5601/api/saved_objects/_import?overwrite=true' -H 'Connection: keep-alive' -H 'Origin: http://localhost:5601' -H 'kbn-version: 7.5.0' --form file=@data/kibana.ndjson
-
Show Kibana discovery view & map viz
But who is rmoff
, and does he mind us having access to all this information about him?
Check out the source data in MySQL:
SELECT USERID, EMAIL, SHARE_LOCATION_OPTIN FROM USERS;
+--------+------------------+----------------------+
| USERID | EMAIL | SHARE_LOCATION_OPTIN |
+--------+------------------+----------------------+
| rmoff | robin@rmoff.net | 1 |
| ivor | ivor@example.com | 0 |
| hugh | hugh@example.com | 0 |
+--------+------------------+----------------------+
Ingest the data into ksqlDB
CREATE SOURCE CONNECTOR source_debezium_mysql_users_00 WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3307',
'database.user' = 'debezium',
'database.password' = 'dbz',
'database.server.id' = '42',
'database.server.name' = 'asgard',
'table.whitelist' = 'demo.USERS',
'database.history.kafka.bootstrap.servers' = 'kafka-1:39092',
'database.history.kafka.topic' = 'dbhistory.demo' ,
'decimal.handling.mode' = 'double',
'include.schema.changes' = 'true',
'transforms' = 'unwrap,addTopicPrefix',
'transforms.unwrap.type' = 'io.debezium.transforms.UnwrapFromEnvelope',
'transforms.addTopicPrefix.type' = 'org.apache.kafka.connect.transforms.RegexRouter',
'transforms.addTopicPrefix.regex' = '(.*)',
'transforms.addTopicPrefix.replacement' = 'mysql2-$1'
);
Declare the KSQL table on the topic populated from the database:
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM USERS_STREAM WITH (KAFKA_TOPIC='mysql-asgard.demo.USERS', VALUE_FORMAT='AVRO');
CREATE STREAM USERS_REKEY_P6 WITH (PARTITIONS=6) AS SELECT * FROM USERS_STREAM PARTITION BY USERID;
CREATE STREAM USERS_REKEY_P1 WITH (PARTITIONS=1) AS SELECT * FROM USERS_STREAM PARTITION BY USERID;
PRINT USERS_REKEY_P6 LIMIT 1;
CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P6', VALUE_FORMAT='AVRO');
-- CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P1', VALUE_FORMAT='AVRO');
-- DROP TABLE USERS;
Examine the data:
SET 'auto.offset.reset' = 'latest';
SELECT TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
R.WHO,
U.EMAIL,
U.SHARE_LOCATION_OPTIN,
R.LAT,
R.LON
FROM PHONE_DATA R
LEFT JOIN USERS U
ON R.WHO = U.ROWKEY ;
Set datagen running
./run_datagen.sh
+---------+------------------+----------------------+-----------+----------+---------------+
|WHO |EMAIL |SHARE_LOCATION_OPTIN |LON |LAT |BATTERY_STATUS |
+---------+------------------+----------------------+-----------+----------+---------------+
|hugh |hugh@example.com |0 |-78.74988 |35.66231 |Unplugged |
|rick |null |null |-1.812582 |53.95524 |Charging |
|rmoff |robin@rmoff.net |1 |-1.812581 |53.92535 |Unplugged |
|ivor |ivor@example.com |0 |-1.812575 |53.955235 |Full |
SET 'auto.offset.reset' = 'latest';
SELECT TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
WHO
,U.EMAIL AS EMAIL
,CASE WHEN U.SHARE_LOCATION_OPTIN = 1 THEN LOCATION
ELSE CAST(NULL AS VARCHAR)
END AS LOCATION
FROM PHONE_DATA R
LEFT JOIN USERS U
ON R.WHO = U.ROWKEY
WHERE WHO='ivor';
In a new terminal, show MySQL with KSQL still visible.
docker exec -it mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'
In MySQL make an update to a user’s profile to switch their data optin; note how the KSQL query above changes in response to it.
UPDATE USERS SET SHARE_LOCATION_OPTIN=TRUE WHERE USERID='ivor';
UPDATE USERS SET SHARE_LOCATION_OPTIN=FALSE WHERE USERID='ivor';
Looking at the data in MySQL in more detail, we can see each user can optionally specify a privacy zone within which their data won’t be shared, but outside of which it can.
SELECT * FROM USERS WHERE USERID='rmoff' \G
*************************** 1. row ***************************
USERID: rmoff
EMAIL: robin@rmoff.net
SHARE_LOCATION_OPTIN: 1
PRIVACY_LOCATION_LAT: 53.924729
PRIVACY_LOCATION_LON: -1.804453
PRIVACY_ZONE_KM: 1
CREATE_TS: 2019-11-14 06:54:38
UPDATE_TS: 2019-11-14 06:54:38
1 row in set (0.00 sec)
We can apply this logic in the SQL as part of the streaming application:
CREATE STREAM PHONE_LOCATION_OPTIN AS
SELECT WHO,
EVENT_TIME_EPOCH_MS_TS,
CASE
WHEN U.SHARE_LOCATION_OPTIN = 1 THEN
CASE
WHEN GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') > PRIVACY_ZONE_KM
THEN LOCATION
ELSE '<Private>'
END
WHEN U.SHARE_LOCATION_OPTIN = 0 THEN '<Opted out>'
ELSE '<No user record>'
END AS LOCATION,
GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') AS DISTANCE_KM_FROM_PRIVACY_ZONE,
PRIVACY_ZONE_KM AS PRIVACY_ZONE_THRESHOLD_KM
BATTERY_PCT,
BATTERY_STATUS,
U.EMAIL AS EMAIL
FROM PHONE_DATA R
LEFT JOIN USERS U
ON R.WHO = U.ROWKEY;
SELECT TIMESTAMPTOSTRING(ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
WHO,
LOCATION,
DISTANCE_KM_FROM_PRIVACY_ZONE,
PRIVACY_ZONE_THRESHOLD_KM
FROM PHONE_LOCATION_OPTIN
WHERE WHO='rmoff';
You can also use KSQL to create a subset of the data so that other teams could use the data
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PHONE_BATTERY_DATA AS
SELECT WHO, BATTERY_PCT, BATTERY_STATUS, CONNECTIVITY_STATUS
FROM PHONE_DATA;
Aggregate the data to show connectivity type per day:
SELECT TIMESTAMPTOSTRING(windowstart(), 'yyyy-MM-dd HH:mm:ss') AS TS,
CONNECTIVITY_STATUS, COUNT(*)
FROM PHONE_BATTERY_DATA
WINDOW TUMBLING (SIZE 1 DAY)
GROUP BY CONNECTIVITY_STATUS;
With a schema in place we can pick out fields from the data:
SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') as TS, ROWKEY, BATT FROM MQTT_RAW;
+-------------------------+-------------------------+-------------------------+
|TS |ROWKEY |BATT |
+-------------------------+-------------------------+-------------------------+
|2019-09-30 20:47:30 |owntracks/race-write/rife|45 |
|2019-09-30 20:47:30 |owntracks/race-write/rmof|100 |
| |f- | |
|2019-09-30 20:47:30 |owntracks/race-write/EF81|100 |
| |CA0A-BBD6-4116-BBC7-38EE8| |
| |FA3D5A4 | |
[…]
We can use predicates to filter the data:
SELECT ROWKEY, TST, BATT
FROM MQTT_RAW
WHERE ROWKEY LIKE '%rmoff';
+-----------------------------+-------------+---------+
|ROWKEY |TST |BATT |
+-----------------------------+-------------+---------+
|owntracks/tiqmyral/rmoff |1569316069 |97 |
|owntracks/tiqmyral/rmoff |1569315063 |96 |
|owntracks/tiqmyral/rmoff |1569312091 |95 |
Looking at the message key it’s the final part of it that identifies the user, so let’s extract that
SELECT ROWKEY, SPLIT(ROWKEY, '/')[2] AS WHO
FROM MQTT_RAW
LIMIT 5;
Per the documentation there are some fields which have special meanings, such as the state of the battery:
SELECT BS FROM MQTT_RAW;
+--------+
|BS |
+--------+
|1 |
|1 |
|1 |
|1 |
|1 |
We can use KSQL to apply these values to the codes to make the data more useful. Check out the AS
clause too for changing the schema field names.
SELECT BS,
CASE WHEN BS=0 THEN 'Unknown'
WHEN BS=1 THEN 'Unplugged'
WHEN BS=2 THEN 'Charging'
WHEN BS=3 THEN 'Full'
ELSE '[unknown]'
END AS BATTERY_STATUS,
BATT AS BATTERY_PCT
FROM MQTT_RAW;
+----------------------------+----------------------------+----------------------------+
|BS |BATTERY_STATUS |BATT |
+----------------------------+----------------------------+----------------------------+
|1 |Unplugged |45 |
|1 |Unplugged |45 |
|3 |Full |100 |
|2 |Charging |100 |
|1 |Unplugged |45 |
|3 |Full |100 |
EOFEOFEOF