Learn the basics

ksqlDB Quickstart

The guide below demonstrates how to get a minimal environment up and running. Choose the distribution that's right for you.

When you're ready to learn more, take the next steps with use-case-driven tutorials.

Use ksqlDB through a fully-managed service, with pay-as-you-go pricing.

1. Sign up for Confluent Cloud and create clusters

Confluent Cloud offers a fully managed, cloud-native event streaming platform powered by Apache Kafka®. Spin up ksqlDB clusters on demand with pay-as-you-go pricing.

Begin by signing up for a Confluent Cloud account. Follow the in-product instructions to launch Kafka and ksqlDB clusters within the Confluent Cloud user interface.

2. Get the Confluent Cloud CLI

We'll leverage the Confluent Cloud CLI to help expedite the next steps.

Download Confluent Cloud CLI

Once it's installed, authenticate using your Confluent Cloud credentials.

ccloud login

3. Gather information about your ksqlDB cluster

Use the Confluent CLI to list metadata about your ksqlDB cluster. Paste the output of the command into the following field.

ksqlDB cluster status
Provisioned on Confluent Cloud
Available for queries
ccloud ksql app list -o json

Tip: Pasting cluster information above will automatically replace variables like $KSQL_CLUSTER_ID, in the steps below.

4. Give permission for ksqlDB to access topics in your Kafka cluster

Kafka clusters on Confluent Cloud employ fine-grained access control. Let's give ksqlDB explicit access to create, read, and write topics that begin with the quickstart- prefix.

ccloud kafka acl create --allow \
       --topic "quickstart-" --prefix \
       --operation CREATE \
       --operation READ \
       --operation WRITE \
       --service-account $(ccloud service-account list | grep $KSQL_CLUSTER_ID | cut -d '|' -f1) \
       --cluster $KAFKA_CLUSTER_ID

5. Create an API key

Generate an API key to authenticate with your ksqlDB cluster. Then, save the key and secret locally.

ccloud api-key create --resource $KSQL_CLUSTER_ID
Export the output as KSQL_API_KEY and KSQL_API_SECRET for use below

6. Start ksqlDB's interactive CLI

Run this command to connect to the ksqlDB server and enter an interactive CLI session.

docker run -it confluentinc/ksqldb-cli:0.11.0 ksql \
       -u $KSQL_API_KEY \
       -p $KSQL_API_SECRET \
       "$KSQL_ENDPOINT"

7. Create a stream

The first thing we're going to do is create a stream. A stream essentially associates a schema with an underlying Kafka topic. Here's what each parameter in the CREATE STREAM statement does:

  • kafka_topic - Name of the Kafka topic underlying the stream. In this case it will be automatically created because it doesn't exist yet, but streams may also be created over topics that already exist.
  • value_format - Encoding of the messages stored in the Kafka topic. For JSON encoding, each row will be stored as a JSON object whose keys/values are column names/values. For example: {"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
  • partitions - Number of partitions to create for the locations topic. Note that this parameter is not needed for topics that already exist.

Check the documentation for more information about streams.

Copy and paste this statement into your interactive CLI session, and press enter to execute the statement.

CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
  WITH (kafka_topic='quickstart-locations', value_format='json', partitions=1);

8. Run a continuous query over the stream

Run the given query using your interactive CLI session.

This query will output all rows from the riderLocations stream whose coordinates are within 5 miles of Mountain View.

This is the first thing that may feel a bit unfamiliar to you, because the query will never return until it's terminated. It will perpetually push output rows to the client as events are written to the riderLocations stream.

Leave this query running in the CLI session for now. Next, we're going to write some data into the riderLocations stream so that the query begins producing output.

-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5
  EMIT CHANGES;

9. Start another CLI session

Since the CLI session from step 8 is busy waiting for output from the continuous query, let's start another session that we can use to write some data into ksqlDB.

docker run -it confluentinc/ksqldb-cli:0.11.0 ksql \
       -u $KSQL_API_KEY \
       -p $KSQL_API_SECRET \
       "$KSQL_ENDPOINT"

10. Populate the stream with events

Run each of the given INSERT statements within the new CLI session, and keep an eye on the CLI session from step 8 as you do.

The continuous query will output matching rows in real time as soon as they're written to the riderLocations stream.

INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
Learn more with end-to-end use cases