ksqlDB example snippets

Here you’ll find snippets designed to illustrate ksqlDB’s core concepts while providing a starting point for developing your stream processing application.

While many ksqlDB query constructs are outlined in isolation here, these individual constructs may be freely composed into arbitrarily complex queries that suit your needs. Persistent queries may also be chained together, giving you the flexibility to build highly advanced stream processing applications without writing any application code.

For in-depth technical coverage of ksqlDB’s architecture and powerful feature set, visit the ksqlDB documentation.

Creating streams and tables

Create a stream over an existing Kafka topic
CREATE STREAM s1 (c1 VARCHAR, c2 INTEGER)
  WITH (kafka_topic='s1', value_format='json');
Create a table over an existing Kafka topic
CREATE TABLE t1 (c1 VARCHAR PRIMARY KEY, c2 INTEGER)
  WITH (kafka_topic='t1', value_format='json');
Create a stream or table with a specific key
CREATE STREAM keyed (id VARCHAR, c2 INTEGER)
  WITH (kafka_topic='keyed', key='id', value_format='json');
Create a derived stream from another stream
CREATE STREAM derived AS
  SELECT a + 1 AS x, b AS y FROM s1 EMIT CHANGES;

Populating streams and tables

INSERT rows into a stream or table
INSERT INTO s1 (x, y, z) VALUES (0, 1, 2);

Introspecting streams and tables

List all topics
SHOW TOPICS;
List all streams
SHOW STREAMS;
List all tables
SHOW TABLES;
Describe a stream or table
DESCRIBE s1;
-- Describe s1 in detail:
DESCRIBE EXTENDED s1;
Show the contents of a Kafka topic
PRINT 'topic_name' FROM BEGINNING;

Selecting data

Select all rows and columns
CREATE STREAM s2 AS
  SELECT * FROM s1 EMIT CHANGES;
Select a subset of columns
CREATE STREAM s2 AS
  SELECT c1, c2, c3 FROM s1 EMIT CHANGES;
Filter rows
CREATE STREAM s2 AS
  SELECT * FROM s1
  WHERE c1 != 'foo' AND c2 = 42 EMIT CHANGES;
Apply a function to columns
SELECT SUBSTRING(str, 1, 10)
  FROM s1 EMIT CHANGES;
Select NULL or non-NULL columns
SELECT * FROM s1
  WHERE c1 IS NOT NULL OR c2 IS NULL EMIT CHANGES;
Timestamp comparison
-- Timestamp literals may be specified using ISO-8601 formatting:
SELECT * FROM s1
  WHERE ROWTIME >= '2019-11-20T00:00:00' EMIT CHANGES;

-- Timestamps can also be interpreted as epoch milliseconds:
SELECT * FROM s1
  WHERE ROWTIME >= 1574208000000 EMIT CHANGES;

Date/time operations

-- Format a epoch milliseconds timestamp as a string
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss')
  FROM s1 EMIT CHANGES;

-- Parse a date string using the given format
SELECT STRINGTOTIMESTAMP(datestr, 'yyyy-MM-dd HH:mm:ss')
  FROM s1 EMIT CHANGES;
Select a value of a Map by key
CREATE STREAM s1 (map1 MAP<VARCHAR, INTEGER>)
  WITH (kafka_topic='map1', value_format='json', partitions=1);

SELECT map1['key1'], map1['key2']
  FROM s1 EMIT CHANGES;
Select an element from an Array
CREATE STREAM s1 (arr ARRAY<INTEGER>)
  WITH (kafka_topic='arr', value_format='json', partitions=1);

SELECT arr[0], arr[1]
  FROM s1 EMIT CHANGES;
Select a field of a Struct
CREATE STREAM s1 (item STRUCT<f1 INT, f2 INT>)
  WITH (kafka_topic='struct', value_format='json', partitions=1);

SELECT item->f1, item->f2
  FROM s1 EMIT CHANGES;
Select a bounded number of rows
SELECT * FROM s1 EMIT CHANGES LIMIT 1;

Aggregating data

Aggregating columns
SELECT x, COUNT(*), SUM(y) FROM s1
  GROUP BY x EMIT CHANGES;
Windowed aggregation
SELECT x, COUNT(*), SUM(y) FROM s1
  WINDOW TUMBLING (SIZE 1 MINUTE)
  GROUP BY x EMIT CHANGES;

Joining streams and tables

Join two streams together
CREATE STREAM s3 AS
  SELECT s1.c1, s2.c2
  FROM s1
  JOIN s2 WITHIN 5 MINUTES
  ON s1.c1 = s2.c1 EMIT CHANGES;
Join a stream and a table together
CREATE STREAM s3 AS
  SELECT my_stream.c1, my_table.c2
  FROM my_stream
  JOIN my_table
  ON s1.c1 = s2.c1 EMIT CHANGES;

Creating materialized views

Create materialized view over a stream
CREATE TABLE agg AS
  SELECT x, COUNT(*), SUM(y)
  FROM s1 GROUP BY x EMIT CHANGES;
Create materialized view over a stream and table
CREATE TABLE agg AS
  SELECT x, COUNT(*), SUM(y)
  FROM my_stream
  JOIN my_table
  ON my_stream.x = my_table.x
  GROUP BY x EMIT CHANGES;
Create a windowed materialized view over a stream
CREATE TABLE agg AS
  SELECT x, COUNT(*), SUM(y)
  FROM s1 WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY x EMIT CHANGES;
Perform a lookup against a materialized view
SELECT * FROM agg WHERE ROWKEY = 'x';
Perform a lookup against a windowed materialized view
SELECT * FROM agg
  WHERE ROWKEY = 'x'
  AND WINDOWSTART = '2019-11-20T00:00:00';

Integrating with external data sources and sinks

List all external sources and sinks
SHOW CONNECTORS;
Describe an external source or sink
DESCRIBE CONNECTOR conn1;
Create a PostgreSQL source connector using JDBC
-- Stream the contents of each table in the include list
-- into a corresponding topic named "jdbc_<table name>"
CREATE SOURCE CONNECTOR jdbc_source WITH (
  'connector.class'          = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url'           = 'jdbc:postgresql://localhost:5432/postgres',
  'connection.user'          = 'user',
  'topic.prefix'             = 'jdbc_',
  'table.whitelist'          = 'include_this_table',
  'mode'                     = 'incrementing',
  'numeric.mapping'          = 'best_fit',
  'incrementing.column.name' = 'id',
  'key'                      = 'id');
Create an Elasticsearch sink connector
-- Send data from all of the given topics into Elasticsearch
 CREATE SINK CONNECTOR elasticsearch_sink WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'topics'          = 'send_these_topics_to_elasticsearch',
  'key.ignore'      = 'true',
  'schema.ignore'   = 'true',
  'type.name'       = '',
  'connection.url'  = 'http://localhost:9200');

Destroying streams, tables and queries

Terminate a persistent query
TERMINATE q1;
Drop a stream
DROP STREAM s1;
Drop a table
DROP TABLE t1;