In this post I will explore the ksqlDb API for creating streams. This is the second post about using ksqlDb with Spring Boot. Read the first post here.
A stream in ksqlDb is analogous to a stream in Kafka Streams. One difference is how you create the stream. As described in the first post, a stream in Kafka Streams is created programmatically with an API.
ksqlDb uplevels this to a more familiar SQL syntax. Here’s an example from the ksqlDb quickstart.
This creates a stream
processing records from the topic locations
with the records in JSON
format and three immutable fields. One important thing to note is that
this CREATE STREAM
command creates the locations
topic. There is an additional
SQL command if the topic already exists, CREATE STREAM AS SELECT
.
Given this stream you can query it …
You can also apply filters (i.e. a WHERE clause) …
select specific columns and as shown use operations on the data.
You can execute these SQL statements via the ksqlDb CLI, but I will show how to use the REST client API directly.
In the first post I showed a Spring WebFlux request handler class
KsqlDbRequestHandler
. To implement the CREATE STREAM
I will extend that class
and add a new method createStream()
. My
approach with this is to hide the details of the ksqlDb SQL syntax from
the client. The client will simply provide the required data (e.g.
stream name, topic name, columns) and the handler will construct the SQL
statement to send to the ksqlDb server.
To that end I have a class CreateStreamRequest
.
The @Data
annotation is from Lombok,
a Java annotation processor which generates boilerplate during the build
process, allowing you to avoid the boilerplate in your code. The
@Data
annotation generates the getters/setters, toString, equals, hashCode and
constructor. The CreateStreamRequest
class is relatively
basic, and doesn’t expose everything that the CREATE STREAM
can support.
In the KsqlDbRequestHandler
class I added a method to process a POST
with
the CreateStreamRequest
as the JSON body.
Using the Spring WebFlux reactive API, the createStream()
method converts the body
from the JSON to a CreateStreamRequest
class in a Mono
. Then the
CreateStreamRequest
is converted to SQL in the executeCreateStreamRequest()
method. This is where the ksqlDb REST client is invoked to have the
ksqlDb server create the stream. The REST API is a generic
makeSqlRequest(String sql)
for executing any ksqlDb commands.
The return value from the ksqlDb REST invocation is
RestResponse<KsqlEntityList>
. The RestResponse
class encapsulates a request
status (successful or unsuccessful), an error message an a
generic body, in this case KsqlEntityList
which is just a convenience domain
specific type of ArrayList<KsqlEntity>
. The KsqlEntity
is just a holder for
what the SQL statement was and any warning messages that might have been
generated.
A simple integration test uses the WebFlux WebTestClient
class while also
mocking the ksqlDb server.
One thing to note is the setup, which shows how to configure the WebTestClient
class, in this case to change the timeout. This was useful as noted when using
the debugger since the WebTestClient
would otherwise
timeout while trying to figure out what’s going on.
Rather than mocking the ksqlDb server, I could use the Testcontainers project as described in the first post.
In future posts I will show how to process data and use “pull queries” which I think is a really nice feature of ksqlDb.