CREATE STREAM s1 (c1 VARCHAR, c2 INTEGER)
WITH (kafka_topic='s1', value_format='json');
Here you’ll find snippets designed to
illustrate ksqlDB’s core concepts while providing a starting point for developing
your stream processing application.
While many ksqlDB query constructs are outlined in isolation here,
these individual constructs may be freely composed into arbitrarily complex queries that suit your needs.
Persistent queries may also be chained together, giving you the flexibility to build highly
advanced stream processing applications without writing any application code.
For in-depth technical coverage of ksqlDB’s architecture and powerful feature set, visit the
ksqlDB documentation.
CREATE STREAM s1 (c1 VARCHAR, c2 INTEGER)
WITH (kafka_topic='s1', value_format='json');
CREATE TABLE t1 (c1 VARCHAR PRIMARY KEY, c2 INTEGER)
WITH (kafka_topic='t1', value_format='json');
CREATE STREAM keyed (id VARCHAR, c2 INTEGER)
WITH (kafka_topic='keyed', key='id', value_format='json');
CREATE STREAM derived AS
SELECT a + 1 AS x, b AS y FROM s1 EMIT CHANGES;
INSERT INTO s1 (x, y, z) VALUES (0, 1, 2);
SHOW TOPICS;
SHOW STREAMS;
SHOW TABLES;
DESCRIBE s1;
-- Describe s1 in detail:
DESCRIBE EXTENDED s1;
PRINT 'topic_name' FROM BEGINNING;
CREATE STREAM s2 AS
SELECT * FROM s1 EMIT CHANGES;
CREATE STREAM s2 AS
SELECT c1, c2, c3 FROM s1 EMIT CHANGES;
CREATE STREAM s2 AS
SELECT * FROM s1
WHERE c1 != 'foo' AND c2 = 42 EMIT CHANGES;
SELECT SUBSTRING(str, 1, 10)
FROM s1 EMIT CHANGES;
NULL
or non-NULL
columnsSELECT * FROM s1
WHERE c1 IS NOT NULL OR c2 IS NULL EMIT CHANGES;
-- Timestamp literals may be specified using ISO-8601 formatting:
SELECT * FROM s1
WHERE ROWTIME >= '2019-11-20T00:00:00' EMIT CHANGES;
-- Timestamps can also be interpreted as epoch milliseconds:
SELECT * FROM s1
WHERE ROWTIME >= 1574208000000 EMIT CHANGES;
-- Format a epoch milliseconds timestamp as a string
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss')
FROM s1 EMIT CHANGES;
-- Parse a date string using the given format
SELECT STRINGTOTIMESTAMP(datestr, 'yyyy-MM-dd HH:mm:ss')
FROM s1 EMIT CHANGES;
Map
by keyCREATE STREAM s1 (map1 MAP<VARCHAR, INTEGER>)
WITH (kafka_topic='map1', value_format='json', partitions=1);
SELECT map1['key1'], map1['key2']
FROM s1 EMIT CHANGES;
Array
CREATE STREAM s1 (arr ARRAY<INTEGER>)
WITH (kafka_topic='arr', value_format='json', partitions=1);
SELECT arr[0], arr[1]
FROM s1 EMIT CHANGES;
Struct
CREATE STREAM s1 (item STRUCT<f1 INT, f2 INT>)
WITH (kafka_topic='struct', value_format='json', partitions=1);
SELECT item->f1, item->f2
FROM s1 EMIT CHANGES;
SELECT * FROM s1 EMIT CHANGES LIMIT 1;
SELECT x, COUNT(*), SUM(y) FROM s1
GROUP BY x EMIT CHANGES;
SELECT x, COUNT(*), SUM(y) FROM s1
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY x EMIT CHANGES;
CREATE STREAM s3 AS
SELECT s1.c1, s2.c2
FROM s1
JOIN s2 WITHIN 5 MINUTES
ON s1.c1 = s2.c1 EMIT CHANGES;
CREATE STREAM s3 AS
SELECT my_stream.c1, my_table.c2
FROM my_stream
JOIN my_table
ON s1.c1 = s2.c1 EMIT CHANGES;
CREATE TABLE agg AS
SELECT x, COUNT(*), SUM(y)
FROM s1 GROUP BY x EMIT CHANGES;
CREATE TABLE agg AS
SELECT x, COUNT(*), SUM(y)
FROM my_stream
JOIN my_table
ON my_stream.x = my_table.x
GROUP BY x EMIT CHANGES;
CREATE TABLE agg AS
SELECT x, COUNT(*), SUM(y)
FROM s1 WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY x EMIT CHANGES;
SELECT * FROM agg WHERE ROWKEY = 'x';
SELECT * FROM agg
WHERE ROWKEY = 'x'
AND WINDOWSTART = '2019-11-20T00:00:00';
SHOW CONNECTORS;
DESCRIBE CONNECTOR conn1;
-- Stream the contents of each table in the include list
-- into a corresponding topic named "jdbc_<table name>"
CREATE SOURCE CONNECTOR jdbc_source WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://localhost:5432/postgres',
'connection.user' = 'user',
'topic.prefix' = 'jdbc_',
'table.whitelist' = 'include_this_table',
'mode' = 'incrementing',
'numeric.mapping' = 'best_fit',
'incrementing.column.name' = 'id',
'key' = 'id');
-- Send data from all of the given topics into Elasticsearch
CREATE SINK CONNECTOR elasticsearch_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topics' = 'send_these_topics_to_elasticsearch',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'type.name' = '',
'connection.url' = 'http://localhost:9200');
TERMINATE q1;
DROP STREAM s1;
DROP TABLE t1;