In this post I will show how to use the ksqlDb Java REST client to interact with the ksqlDb server using a Spring Boot application as a foundation. The Boot app will also use Spring Webflux to help expand the use of reactive programming. I will also show the use of the Testcontainers framework for integrating deployment of Docker containers as part of integration testing, in this case with JUnit 5.

What this will not cover is details of Apache Kafka or Spring Boot, or even ksqlDb and streaming systems, except to provide background to understand the code. There are plenty of other examples and posts that cover those details.

The code covered in this post is available on GitHub here. This is the first of several posts that will expand on the use of ksqlDb and Spring Boot microservices.

What is ksqlDb?

To understand ksqlDb you need to understand where it comes from. First there is Apache Kafka, one of the most popular messaging systems in use today. Apache Kafka is an open-source project that originated at LinkedIn, and the founders subsequently formed Confluent Inc. Then there is Kafka Streams, which adds streaming metaphors as a library leveraging Kafka.

Kafka

At the heart of Kafka, and what distinguishes it from other messaging systems, is the notion of a “log.” We’re not talking here about logging systems most developers use daily to write errors or debug messages. A log in the Kafka context is the same as used by database systems. An excellent discussion can be found here on the LinkedIn engineering blog, written by the co-founder of Confluent.

Suffice to say for the purposes of this post that a log is an append-only file of immutable records. In Kafka, logs are used to store messages written to ‘topics.’ A topic is the fundamental interconnect between ‘producers’ that write to topics, and ‘consumers’ that read from topics. Producers and consumers are independent. Through the use of logs, producers and consumers can operate at different speeds, in different locations and have guarantees that records aren’t lost. Kafka can support ‘exactly once’ message delivery, unlike most messaging systems.

Kafka Streams

On top of this foundation, Kafka Streams is a library that enables streaming operations applied to data consumed from topics. For example, you can apply a ‘filter’ operation followed by an ‘aggregation’ operation (e.g. sum, count, min, max, average) followed by a ‘transform’ operation, on records read from a topic. You can also apply a ‘group’ operation, so that operations are applied on records related to each other. For example, you might be processing orders and have an ‘order-id’ or payments that have an ‘account-number’. The series of operations form a graph or ‘topology’ as it is called in Kafka Streams.

For additional details on Kafka Streams please read the documentation, in particular the architecture described here.

ksqlDb

Both Kafka and Kafka Streams are sophisticated technologies, with a lot of options in how to use, deploy and scale. And the Kafka Streams library is just that --- a library. Developers must learn the API and build their own topologies in a microservice that they build, deploy and manage. This is non-trivial (as I know first hand).

If you consider the example operations I described above --- filter, sum, group and the rest --- it may come to mind that this somewhat maps to a SQL query on some data in a database table. Let’s say you wanted to get list by customer of the total amount they spent on orders of electronics. In SQL it would be something like

SELECT CUST_ID, SUM(ORDER_AMT)
    WHERE ORDER_CAT = ‘ELECTRONICS’
    GROUP BY ORDER_ID

You can also express that using the Kafka Streams API:

StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<Integer, Order> ordersStream = streamsBuilder.stream("orders");
ordersStream
    .filter((key, value) -> value.orderType == Electronics)
    .groupByKey()
    .aggregate(() -> 0.0f, (key, value, sum) -> sum += value.orderAmount)
    .toStream()
        .to("output");
Topology topology = streamsBuilder.build();
KafkaStreams kafkaStreams = new KafkaStreams(topology, new Properties());
kafkaStreams.start();

As you can see, it’s somewhat more complicated than the SQL. You have to create the stream using the builder (line 1), define your operations (lines 4-6) convert to a Topology (line 9) which is then provided to the KafkaStream manager that handles the lifecycle (line 10), which I’ve limited to showing the call to the start() method (line 11). You also must manage the source and sink topics yourself, called “orders” and “output” in the example, ensuring they exist.

This is a relatively simple topology. You could have several or hundreds, each with a much larger set of operations. Each topology is independent with its own lifecycle.

The folks at Confluent must have realized this, perhaps from customer and community feedback (I’m not affiliated with Confluent so I don’t know within whose head the lightbulb went off). To simplify the use of Kafka Streams and broaden its appeal, they came up with ksqlDb.

ksqlDb abstracts the details of the Streams API, the topology building and management, deployment, scaling and all that fun stuff. It lets users leverage a familiar API --- SQL --- to build streaming applications. Check out the ksqlDb quickstart here, to get a sense of the usage from a user point of view.

Spring Boot and ksqlDb client

The Boot application described here acts as a client to the ksqlDb server. The Boot application will expose its own REST API, as a proxy for the actual ksqlDb REST API. The ksqlDb server itself requires Kafka, which requires Zookeeper. The easiest way to run all of this on your laptop is with Docker and docker-compose. The ksqlDb quickstart provides a docker-compose file, which I leverage pretty much as is.

As such, Docker is a prerequisite for running this Boot app.

@SpringBootApplication
public class KsqldbDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KsqldbDemoApplication.class, args);
    }
}

With Spring Webflux you need two things, a router configuration that maps REST endpoints to handler functions, and the handler functions. For the purposes of this example (to be expanded on in future posts) I will proxy the ksqlDb REST client “server info” method that queries the ksqlDb server for some metadata. So the Webflux router exposes a “/info” endpoint and routes to a handler.

@EnableWebFlux
@Configuration
public class WebfluxRouterConfiguration {
    public static final String DEFAULT_SERVER_ADDRESS = "http://localhost:8088";
 
    @Bean
    public RouterFunction<ServerResponse> infoRouterFunction(KsqlDbRequestHandler ksqlDbRequestHandler) {
        return RouterFunctions.route()
            .GET("/info", ksqlDbRequestHandler::info)
            .build();
    }
 
    @Bean
    public KsqlDbRequestHandler ksqlDbRequestHandler(KsqlRestClient ksqlRestClient) {
        return new KsqlDbRequestHandler(ksqlRestClient);
    }
 
    @Bean
    public KsqlRestClient ksqlRestClient() {
        return KsqlRestClient.create(DEFAULT_SERVER_ADDRESS,
            Collections.emptyMap(),
            Collections.emptyMap(),
            Optional.empty());
    }
}

The infoRouterFunction() uses the injected handler bean and associates “/info” with the KsqlDbRequestHandler.info)method. The KsqlDbRequestHandler itself is injected with the KsqlRestClient. For now, it assumes that the ksqlDb server deployed via Docker has its port mapped to 8088 on the local machine, so that the Boot application doesn’t itself have to be deployed within the Docker cluster. The server address for the ksqlDb server should be configurable of course, and I will enhance the application to use the Boot externalized configuration support.

The handler class shown next is similarly relatively straightforward.

public class KsqlDbRequestHandler {
 
    private final KsqlRestClient ksqlRestClient;
 
    public KsqlDbRequestHandler(KsqlRestClient ksqlRestClient) {
        this.ksqlRestClient = ksqlRestClient;
    }
 
    public Mono<ServerResponse> info(ServerRequest serverRequest) {
        RestResponse<ServerInfo> serverInfo = ksqlRestClient.getServerInfo();
        if (serverInfo.isSuccessful()) {
            ServerInfo serverInfoResponse = serverInfo.getResponse();
            return ServerResponse.ok()
                .bodyValue(format("cluster_id: %s, ksql_service_id: %s, server_status: %s, version: %s",
                    serverInfoResponse.getKafkaClusterId(),
                    serverInfoResponse.getKsqlServiceId(),
                    serverInfoResponse.getServerStatus(),
                    serverInfoResponse.getVersion()));
        }
        else {
            KsqlErrorMessage errorMessage = serverInfo.getErrorMessage();
            String body = format("Error: %s Code: %s",
                errorMessage.getMessage(), errorMessage.getErrorCode());
            return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).bodyValue(body);
        }
    }
}

The ksqlDb REST client has a serverInfo() method as mentioned. The handler’s info() method simply executes the method and handles the response. If successful, it formats the output nicely and returns it. Otherwise it formats an error response. The Webflux ServerResponseprovides a nice fluent API to form the response message. In Webflux there are two basic types, Mono which returns a single value, and Flux which returns a stream of responses. In this case the server info is just a single string, so the handler returns a Mono<ServerResponse>.

That’s all there is to it, for a basic example. You could run this now manually, by starting the ksqlDb server and dependent services via docker-compose followed by running the Boot app --- either via the Maven/Gradle Boot plugins or from the command line.

But as good developers we write tests right. How would you test this Boot application? It requires the deployment of three services for a full test without mocks (you could mock the ksqlDb server by mocking the KsqlRestClient). But let’s say we want to test the real thing.

There’s a convenient open source project called Testcontainers. You can deploy specific containers that it’s created integrations with directly, everything from Kafka to Cassandra to Elasticsearch. But it also supports running a docker-compose file, for more complicated scenarios and orchestration.

Testcontainers also integrates easily with JUnit 5. Simply use the @Container annotation.

    @Container
    public static DockerComposeContainer dockerComposeContainer =
        new DockerComposeContainer<>(new File("src/test/resources/compose-test.yml"))
            .withServices("zookeeper", "broker", "ksqldb-server")
            .withExposedService("ksqldb-server", 8088,
                Wait.forHealthcheck())
            .withLocalCompose(true);

For testing, I created a stripped down and slightly modified docker-compose YAML file and put it in the test resources folder. As you can see it exposes port 8088 as our REST client configuration expects. Before the tests run, Testcontainers will ensure the services are up, verified by pinging the ksqlDb server container’s healthcheck endpoint as defined in the compose-test.yml file.

Now to test the Boot application’s /info endpoint that we’ve created. With Webflux there’s a test class called WebTestClient. Using this class the test will execute a call to the Boot application and check the response.

@Testcontainers
@AutoConfigureWebTestClient
@SpringBootTest
class KsqldbDemoApplicationTests {
    @Container
    public static DockerComposeContainer dockerComposeContainer =
        new DockerComposeContainer<>(new File("src/test/resources/compose-test.yml"))
            .withServices("zookeeper", "broker", "ksqldb-server")
            .withExposedService("ksqldb-server", 8088,
                Wait.forHealthcheck())
            .withLocalCompose(true);
 
    @Autowired
    private WebTestClient webTestClient;
 
    @Test
    void infoHandler() {
        webTestClient.get()
            .uri("/info")
            .exchange()
            .expectStatus().isOk()
            .expectBody(String.class)
            .consumeWith(result -> {
                String responseBody = result.getResponseBody();
                assertThat(responseBody)
                .contains("cluster_id", "ksql_service_id", "server_status", "version");
            });
    }

Line 1 enables the Testcontainers integration and line 2 ensures that a WebTestClient is created and available for autowiring (line 14). Then we have our actual test method, standard JUnit 5. Line 18 specifies the REST method, line 19 defines the endpoint to invoke, line 20 performs the execution and the remaining lines specify the expected response, the response type and the expected content.

Summary

Hopefully you now have a basic understanding of ksqlDb’s purpose and how to integrate with it via its REST API. Also, how to test containers in combination with Spring Boot and Spring Webflux.

Subsequent posts will expand on this post to get to the point of dynamically creating ksqlDb streams, tables and materialized views, and enabling a REST-based API for doing so, including streaming the results of the queries.