Apache Kafka and Time Series

Tackling Time-Series events using pure ksqlDB

Photo by Jan Huber on Unsplash
docker exec –it ksqldb-cli ksql http://<host address of the ksqldb-server>:8088
CREATE STREAM stream1 (
id BIGINT KEY,
ts VARCHAR,
value1 DOUBLE
) WITH (
KAFKA_TOPIC = ‘topic1’,
VALUE_FORMAT = ‘avro’,
TIMESTAMP = ‘ts’,
TIMESTAMP_FORMAT = ‘dd-MM-yyyy HH:mm:ss’,
PARTITIONS = 1
);
INSERT INTO stream1(id, ts, value1) VALUES (1, ‘03–03–2021 00:00:00’, 4);
INSERT INTO stream1(id, ts, value1) VALUES (1, ‘03–03–2021 01:00:00’, 5);
INSERT INTO stream1(id, ts, value1) VALUES (1, ‘03–03–2021 02:00:00’, 6);
SET ‘auto.offset.reset’ = ‘earliest’;
SET ‘ksql.streams.cache.max.bytes.buffering’=’2000000';
Image by author
Image by author
DEFINE THRESHOLD = ‘10’;
SELECT
id,
TIMESTAMPTOSTRING(WINDOWSTART, ‘dd-MM-yyyy HH:mm:ss’) AS peak_start,
TIMESTAMPTOSTRING(WINDOWEND, ‘dd-MM-yyyy HH:mm:ss’) AS peak_end,
TIMESTAMPTOSTRING(WINDOWEND — WINDOWSTART, ‘HH’) AS peak_width_hours,
MAX(value1) AS peak_max_value,
COUNT(value1) AS num_data_points
FROM stream1
WINDOW SESSION (1 HOUR)
WHERE value1 > ${THRESHOLD}
GROUP BY ID
EMIT CHANGES;
Image by author
Image by author
SELECT
id,
TIMESTAMPTOSTRING(WINDOWSTART, ‘dd-MM-yyyy HH:mm:ss’) AS start_period,
TIMESTAMPTOSTRING(WINDOWEND, ‘dd-MM-yyyy HH:mm:ss’) AS end_period,
SUM(value1) AS sum_value1,
SIGN(COLLECT_LIST(value1)[2] — COLLECT_LIST(value1)[1]) + \
SIGN(COLLECT_LIST(value1)[3] — COLLECT_LIST(value1)[1]) + \
SIGN(COLLECT_LIST(value1)[4] — COLLECT_LIST(value1)[1]) + \
SIGN(COLLECT_LIST(value1)[5] — COLLECT_LIST(value1)[1]) + \
SIGN(COLLECT_LIST(value1)[3] — COLLECT_LIST(value1)[2]) + \
SIGN(COLLECT_LIST(value1)[4] — COLLECT_LIST(value1)[2]) + \
SIGN(COLLECT_LIST(value1)[5] — COLLECT_LIST(value1)[2]) + \
SIGN(COLLECT_LIST(value1)[4] — COLLECT_LIST(value1)[3]) + \
SIGN(COLLECT_LIST(value1)[5] — COLLECT_LIST(value1)[3]) + \
SIGN(COLLECT_LIST(value1)[5] — COLLECT_LIST(value1)[4]) AS S
FROM stream1
WINDOW HOPPING (SIZE 5 HOURS, ADVANCE BY 1 HOURS)
GROUP BY ID
EMIT CHANGES;
Image by author
Image by author
DEFINE N = ‘2’;
DEFINE UPPER = ‘UPPERBOUND’;
DEFINE LOWER = ‘LOWERBOUND’;
CREATE TABLE stddev
AS SELECT
id,
AVG(value1) AS avg_value,
SQRT(((SUM(value1 * value1) — COUNT(value1) * (AVG(value1) * AVG(value1))) / COUNT(value1))) AS stddev,
(AVG(value1) — ${N} * SQRT(((SUM(value1 * value1) — COUNT(value1) * (AVG(value1) * AVG(value1))) / COUNT(value1)))) as ${LOWER},
(AVG(value1) + ${N} * SQRT(((SUM(value1 * value1) — COUNT(value1) * (AVG(value1) * AVG(value1))) / COUNT(value1)))) as ${UPPER}
FROM stream1
GROUP BY ID
EMIT CHANGES;
Image by author
CREATE STREAM outliers 
AS SELECT
stream1.id AS id,
stream1.ts AS ts,
stream1.value1 AS value1,
stddev.lowerbound AS lowerbound,
stddev.upperbound AS upperbound
FROM stream1
LEFT JOIN stddev ON stream1.id = stddev.id
WHERE stream1.value1 < stddev.lowerbound OR stream1.value1 > stddev.upperbound
EMIT CHANGES;
INSERT INTO stream1(id, ts, value1) VALUES (1, ‘04–03–2021 20:00:00’, 45);
Image by author
Image by author

Physicist, AI/ML practitioner, working on end-to-end data analytics.