Skip to content

Latest commit

 

History

History
613 lines (504 loc) · 19 KB

mqtt_demo.adoc

File metadata and controls

613 lines (504 loc) · 19 KB

Building a Stream Processing pipeline with MQTT, Apache Kafka, and KSQL

Overview

mqtt kafka 02

Pre-Flight Setup

Pre-reqs

Local:

  • curl

  • jq

  • Docker

  • kafkacat (or use the docker images and modify the scripts accordingly)

Get the repo

git clone https://github.com/confluentinc/demo-scene.git
cd demo-scene/mqtt-tracker

Start the environment

docker-compose up -d

Run KSQL CLI and MySQL CLI

  • KSQL CLI:

    docker exec -it ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/info) ; echo -e $(date) " KSQL server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksql-server:8088'
  • MySQL CLI:

    docker exec -it mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'

Data

  • Option 1 : load sample data from local

    ./load_sample_data.sh

    or

    kafkacat -b localhost:9092 -t data_mqtt -K: -P -T -l ./data/dummy_data.kcat
    kafkacat -b localhost:9092 -t data_mqtt -K: -P -T -l ./data/tmp.kcat
  • Option 2 : live pull from CCloud

    • Replicator

      ./create_replicator_source.sh
    • kafkacat hacky way

      #!/bin/bash
      
      source .env
      
      kafkacat -b $CCLOUD_BROKER_HOST \
          -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
          -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
          -X ssl.ca.location=/usr/local/etc/openssl/cert.pem -X api.version.request=true \
          -X auto.offset.reset=earliest \
          -G copy_to_local_01 data_mqtt -K: | \
      kafkacat -b localhost:9092,localhost:19092,localhost:29092 \
          -t data_mqtt \
          -K: -P
  • Option 3 : stream from MQTT source

    ./create_mqtt_source_local.sh

Check there’s data on the topic:

kafkacat -b localhost:9092 -t data_mqtt -o beginning -C

Demo

mqtt kafka 06

Check out the topics on the cluster

SHOW TOPICS;

There’s a stream of MQTT data; let’s take a look at it!

PRINT 'data_mqtt';

Let’s take this stream of JSON messages and declare the schema so that we can use it in KSQL. The schema comes from the owner of the data: https://owntracks.org/booklet/tech/json/#_typelocation

CREATE STREAM MQTT_RAW
    (TID  VARCHAR, BATT INTEGER, LON       DOUBLE,  LAT  DOUBLE,
     TST  BIGINT,  ALT  INTEGER, COG       INTEGER, VEL  INTEGER,
     P    DOUBLE,  BS   INTEGER, CONN      VARCHAR, ACC  INTEGER,
     T    VARCHAR, VAC  INTEGER, INREGIONS VARCHAR, TYPE VARCHAR)
WITH (KAFKA_TOPIC = 'data_mqtt', VALUE_FORMAT='JSON');
mqtt kafka 04
SELECT TIMESTAMPTOSTRING(TST*1000, 'MMM-dd HH:mm:ss','Europe/London') as TS,
       TID,
       BATT,
       BS
  FROM MQTT_RAW;
SELECT TIMESTAMPTOSTRING(TST*1000, 'MMM-dd HH:mm:ss','Europe/London') as TS,
       TID,
       BATT,
       BS,
       GEO_DISTANCE(LAT,LON,51.4965005,-0.2106881,'KM') AS DISTANCE_FROM_VENUE
  FROM MQTT_RAW;

Now let’s do something more than just write this to the screen. Let’s write it to a new Kafka topic!

CREATE STREAM PHONE_DATA
    WITH (VALUE_FORMAT='AVRO') AS
    SELECT SPLIT(ROWKEY, '/')[2] AS WHO
            , TST * 1000 AS EVENT_TIME_EPOCH_MS_TS
            , ROWTIME AS SYSTEM_TIME_MS_TS
            , TIMESTAMPTOSTRING(TST*1000,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TIME
            , CASE WHEN LAT IS NULL OR LON IS NULL THEN CAST(NULL AS VARCHAR)
            	   ELSE CAST(LAT AS VARCHAR) +','+CAST(LON AS VARCHAR)
              END AS LOCATION
            , ACC AS LOCATION_ACCURACY_M
            , ALT AS ALTITUDE_M
            , BATT AS BATTERY_PCT
            , CASE WHEN BS=0 THEN 'Unknown'
                   WHEN BS=1 THEN 'Unplugged'
                   WHEN BS=2 THEN 'Charging'
                   WHEN BS=3 THEN 'Full'
                   ELSE '[unknown]'
              END AS BATTERY_STATUS
            , COG AS COURSE_OVER_GROUN
            , CASE WHEN T='p' THEN 'ping issued randomly by background task'
                   WHEN T='c' THEN 'circular region enter/leave event'
                   WHEN T='b' THEN 'beacon region enter/leave event'
                   WHEN T='r' THEN 'response to a reportLocation cmd message'
                   WHEN T='u' THEN 'manual publish requested by the user'
                   WHEN T='t' THEN 'timer based publish in move'
                   WHEN T='v' THEN 'updated by Settings/Privacy/Locations Services/System Services/Frequent Locations monitoring'
                   ELSE '[unknown]'
              END AS REPORT_TRIGGER
            , TID AS TRACKER_ID
            , VAC AS VERTICAL_ACCURACY_M
            , VEL AS VELOCITY_KMH
            , P AS PRESSURE_KPA
            , CASE WHEN CONN='w' THEN 'WiFi'
                   WHEN CONN='o' THEN 'Offline'
                   WHEN CONN='m' THEN 'Mobile'
                   ELSE '[unknown]'
              END AS CONNECTIVITY_STATUS
            , INREGIONS AS REGIONS
            , LAT, LON,
            GEO_DISTANCE(LAT,LON,51.4965005,-0.2106881,'KM') AS DISTANCE_FROM_VENUE
        FROM MQTT_RAW;

Show it worked:

SELECT WHO,
       EVENT_TIME,
       TRACKER_ID,
       BATTERY_PCT,
       BATTERY_STATUS,
       CONNECTIVITY_STATUS
  FROM PHONE_DATA;
mqtt kafka 03

Since this is just a Kafka topic we can use and consume it just like any other. In this example, streaming the data to Elasticsearch.

CREATE SINK CONNECTOR sink_elastic_phone_data_00 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url' = '${file:/data/credentials.properties:ELASTIC_URL}',
  'connection.username' = '${file:/data/credentials.properties:ELASTIC_USERNAME}',
  'connection.password' = '${file:/data/credentials.properties:ELASTIC_PASSWORD}',
  'type.name' = '',
  'behavior.on.malformed.documents' = 'warn',
  'errors.tolerance' = 'all',
  'errors.log.enable' = 'true',
  'errors.log.include.messages' = 'true',
  'topics.regex' = 'PHONE_.*',
  'key.ignore' = 'true',
  'schema.ignore' = 'true',
  'key.converter' = 'org.apache.kafka.connect.storage.StringConverter'
);
  • Check it’s running

    curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
    sink    |  sink-elastic-phone_data-00      |  RUNNING  |  RUNNING  |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  • Set up Kibana

    echo -e "\n--\n+> Opt out of Kibana telemetry"
    curl 'http://localhost:5601/api/telemetry/v2/optIn' -H 'kbn-xsrf: nevergonnagiveyouup' -H 'content-type: application/json' -H 'accept: application/json' --data-binary '{"enabled":false}' --compressed
    
    echo -e "Import objects"
    curl 'http://localhost:5601/api/saved_objects/_import?overwrite=true' -H 'Connection: keep-alive' -H 'Origin: http://localhost:5601' -H 'kbn-version: 7.5.0' --form file=@data/kibana.ndjson
  • Show Kibana discovery view & map viz

mqtt kafka 07a

But who is rmoff, and does he mind us having access to all this information about him?

Check out the source data in MySQL:

SELECT USERID, EMAIL, SHARE_LOCATION_OPTIN FROM USERS;
+--------+------------------+----------------------+
| USERID | EMAIL            | SHARE_LOCATION_OPTIN |
+--------+------------------+----------------------+
| rmoff  | robin@rmoff.net  |                    1 |
| ivor   | ivor@example.com |                    0 |
| hugh   | hugh@example.com |                    0 |
+--------+------------------+----------------------+

Ingest the data into ksqlDB

CREATE SOURCE CONNECTOR source_debezium_mysql_users_00 WITH (
  'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
  'database.hostname' = 'mysql',
  'database.port' = '3307',
  'database.user' = 'debezium',
  'database.password' = 'dbz',
  'database.server.id' = '42',
  'database.server.name' = 'asgard',
  'table.whitelist' = 'demo.USERS',
  'database.history.kafka.bootstrap.servers' = 'kafka-1:39092',
  'database.history.kafka.topic' = 'dbhistory.demo' ,
  'decimal.handling.mode' = 'double',
  'include.schema.changes' = 'true',
  'transforms' = 'unwrap,addTopicPrefix',
  'transforms.unwrap.type' = 'io.debezium.transforms.UnwrapFromEnvelope',
  'transforms.addTopicPrefix.type' = 'org.apache.kafka.connect.transforms.RegexRouter',
  'transforms.addTopicPrefix.regex' = '(.*)',
  'transforms.addTopicPrefix.replacement' = 'mysql2-$1'
);

Declare the KSQL table on the topic populated from the database:

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM USERS_STREAM WITH (KAFKA_TOPIC='mysql-asgard.demo.USERS', VALUE_FORMAT='AVRO');
CREATE STREAM USERS_REKEY_P6 WITH (PARTITIONS=6) AS SELECT * FROM USERS_STREAM PARTITION BY USERID;
CREATE STREAM USERS_REKEY_P1 WITH (PARTITIONS=1) AS SELECT * FROM USERS_STREAM PARTITION BY USERID;
PRINT USERS_REKEY_P6 LIMIT 1;
CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P6', VALUE_FORMAT='AVRO');
-- CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P1', VALUE_FORMAT='AVRO');
-- DROP TABLE USERS;

Examine the data:

SET 'auto.offset.reset' = 'latest';

SELECT TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
       R.WHO,
       U.EMAIL,
       U.SHARE_LOCATION_OPTIN,
       R.LAT,
       R.LON
    FROM PHONE_DATA R
           LEFT JOIN USERS U
           ON R.WHO = U.ROWKEY ;

Set datagen running

./run_datagen.sh
+---------+------------------+----------------------+-----------+----------+---------------+
|WHO      |EMAIL             |SHARE_LOCATION_OPTIN  |LON        |LAT       |BATTERY_STATUS |
+---------+------------------+----------------------+-----------+----------+---------------+
|hugh     |hugh@example.com  |0                     |-78.74988  |35.66231  |Unplugged      |
|rick     |null              |null                  |-1.812582  |53.95524  |Charging       |
|rmoff    |robin@rmoff.net   |1                     |-1.812581  |53.92535  |Unplugged      |
|ivor     |ivor@example.com  |0                     |-1.812575  |53.955235 |Full           |
SET 'auto.offset.reset' = 'latest';

SELECT  TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
         WHO
        ,U.EMAIL AS EMAIL
        ,CASE WHEN U.SHARE_LOCATION_OPTIN = 1 THEN LOCATION
            ELSE CAST(NULL AS VARCHAR)
          END AS LOCATION
  FROM PHONE_DATA R
          LEFT JOIN USERS U
          ON R.WHO = U.ROWKEY
  WHERE WHO='ivor';

In a new terminal, show MySQL with KSQL still visible.

docker exec -it mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'

In MySQL make an update to a user’s profile to switch their data optin; note how the KSQL query above changes in response to it.

UPDATE USERS SET SHARE_LOCATION_OPTIN=TRUE WHERE USERID='ivor';


UPDATE USERS SET SHARE_LOCATION_OPTIN=FALSE WHERE USERID='ivor';

Looking at the data in MySQL in more detail, we can see each user can optionally specify a privacy zone within which their data won’t be shared, but outside of which it can.

SELECT * FROM USERS WHERE USERID='rmoff' \G
*************************** 1. row ***************************
              USERID: rmoff
               EMAIL: robin@rmoff.net
SHARE_LOCATION_OPTIN: 1
PRIVACY_LOCATION_LAT: 53.924729
PRIVACY_LOCATION_LON: -1.804453
     PRIVACY_ZONE_KM: 1
           CREATE_TS: 2019-11-14 06:54:38
           UPDATE_TS: 2019-11-14 06:54:38
1 row in set (0.00 sec)

We can apply this logic in the SQL as part of the streaming application:

CREATE STREAM PHONE_LOCATION_OPTIN AS
  SELECT WHO,
         EVENT_TIME_EPOCH_MS_TS,
         CASE
            WHEN U.SHARE_LOCATION_OPTIN = 1 THEN
              CASE
                WHEN GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') > PRIVACY_ZONE_KM
                  THEN LOCATION
                ELSE '<Private>'
              END
            WHEN U.SHARE_LOCATION_OPTIN = 0 THEN '<Opted out>'
            ELSE '<No user record>'
         END AS LOCATION,
         GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') AS DISTANCE_KM_FROM_PRIVACY_ZONE,
         PRIVACY_ZONE_KM AS PRIVACY_ZONE_THRESHOLD_KM
         BATTERY_PCT,
         BATTERY_STATUS,
         U.EMAIL AS EMAIL
  FROM   PHONE_DATA R
         LEFT JOIN USERS U
         ON R.WHO = U.ROWKEY;
SELECT TIMESTAMPTOSTRING(ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
       WHO,
       LOCATION,
       DISTANCE_KM_FROM_PRIVACY_ZONE,
       PRIVACY_ZONE_THRESHOLD_KM
  FROM PHONE_LOCATION_OPTIN
  WHERE WHO='rmoff';

MOAR derived streams

You can also use KSQL to create a subset of the data so that other teams could use the data

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM PHONE_BATTERY_DATA AS
  SELECT WHO, BATTERY_PCT, BATTERY_STATUS, CONNECTIVITY_STATUS
    FROM PHONE_DATA;

Aggregate the data to show connectivity type per day:

SELECT TIMESTAMPTOSTRING(windowstart(), 'yyyy-MM-dd HH:mm:ss') AS TS,
       CONNECTIVITY_STATUS, COUNT(*)
  FROM PHONE_BATTERY_DATA
        WINDOW TUMBLING (SIZE 1 DAY)
GROUP BY CONNECTIVITY_STATUS;

Back to basics

mqtt kafka 09

With a schema in place we can pick out fields from the data:

SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') as TS, ROWKEY, BATT FROM MQTT_RAW;
+-------------------------+-------------------------+-------------------------+
|TS                       |ROWKEY                   |BATT                     |
+-------------------------+-------------------------+-------------------------+
|2019-09-30 20:47:30      |owntracks/race-write/rife|45                       |
|2019-09-30 20:47:30      |owntracks/race-write/rmof|100                      |
|                         |f-                       |                         |
|2019-09-30 20:47:30      |owntracks/race-write/EF81|100                      |
|                         |CA0A-BBD6-4116-BBC7-38EE8|                         |
|                         |FA3D5A4                  |                         |
[…]
mqtt kafka 08

We can use predicates to filter the data:

SELECT ROWKEY, TST, BATT
  FROM MQTT_RAW
 WHERE ROWKEY LIKE '%rmoff';
+-----------------------------+-------------+---------+
|ROWKEY                       |TST          |BATT     |
+-----------------------------+-------------+---------+
|owntracks/tiqmyral/rmoff     |1569316069   |97       |
|owntracks/tiqmyral/rmoff     |1569315063   |96       |
|owntracks/tiqmyral/rmoff     |1569312091   |95       |

Looking at the message key it’s the final part of it that identifies the user, so let’s extract that

SELECT ROWKEY, SPLIT(ROWKEY, '/')[2] AS WHO
  FROM MQTT_RAW
  LIMIT 5;

Per the documentation there are some fields which have special meanings, such as the state of the battery:

SELECT BS FROM MQTT_RAW;
+--------+
|BS      |
+--------+
|1       |
|1       |
|1       |
|1       |
|1       |

We can use KSQL to apply these values to the codes to make the data more useful. Check out the AS clause too for changing the schema field names.

SELECT BS,
        CASE WHEN BS=0 THEN 'Unknown'
             WHEN BS=1 THEN 'Unplugged'
             WHEN BS=2 THEN 'Charging'
             WHEN BS=3 THEN 'Full'
             ELSE '[unknown]'
        END AS BATTERY_STATUS,
        BATT AS BATTERY_PCT
   FROM MQTT_RAW;
+----------------------------+----------------------------+----------------------------+
|BS                          |BATTERY_STATUS              |BATT                        |
+----------------------------+----------------------------+----------------------------+
|1                           |Unplugged                   |45                          |
|1                           |Unplugged                   |45                          |
|3                           |Full                        |100                         |
|2                           |Charging                    |100                         |
|1                           |Unplugged                   |45                          |
|3                           |Full                        |100                         |

EOFEOFEOF

Appendix

Export topic to file

kafkacat -b localhost:9092 \
  -o beginning -K: -e -q \
  -t data_mqtt \
  > data/export_20190929.kcat

Import data from file

kafkacat -b localhost:9092 \
  -P -K: \
  -t data_mqtt-import \
  -l data/export_20190929.kcat