Secondary to system testing, it can be useful to unit test a topology. With good system test coverage this may not be necessary. If it is needed, then Creek can help.

The creek-kafka-streams-test library provides topology test helpers: TestKafkaStreamsExtensionOptions can be used to initialise Creek without a real Kafka cluster to talk to, and TestTopics provides some factory methods for creating the test topic instances used in the unit test.

Add a unit test

The aggregate template provided a shell TopologyBuilderTest class. To add a basic unit test, add the code below to the class:

import static org.apache.kafka.streams.KeyValue.pair;
import static org.hamcrest.Matchers.containsInAnyOrder;

import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
// ...

class TopologyBuilderTest {
    // ...

    private TestInputTopic<Long, String> tweetTextStream;
    private TestOutputTopic<String, Integer> handleUsageStream;
    // ...

    public void setUp() {
        final KafkaStreamsExtension ext = ctx.extension(KafkaStreamsExtension.class);

        // Build the topology under test:
        topology = new TopologyBuilder(ext).build();

        // Kafka Streams test topology driver:
        testDriver = new TopologyTestDriver(topology,;

        // Create the topologies input and output topics"
        tweetTextStream = inputTopic(TweetTextStream, ctx, testDriver);
        handleUsageStream = outputTopic(TweetHandleUsageStream, ctx, testDriver);

    void shouldOutputHandleOccurrences() {
        // When:
        tweetTextStream.pipeInput(1622262145390972929L, "@PepitoTheCat @BillyM2k @PepitoTheCat Responding to feedback, " +
                "Twitter will enable a light, write-only API for bots providing good content that is free.");

        // Then:
        assertThat(handleUsageStream.readKeyValuesToList(), containsInAnyOrder(
                pair("@PepitoTheCat", 2),
                pair("@BillyM2k", 1)
    // ...

The above adds a single shouldOutputHandleOccurrences unit test for our simple topology, which produces a single record to the topologies input topic, and asserts the output is correct.

Topology test

The eagle-eyed of you may have noticed an existing test at the bottom of TopologyBuilderTest called shouldNotChangeTheTopologyUnintentionally. The intent of this test is to detect any unintentional changes to the Kafka Streams topology.

Warning: There are certain categories of topology changes that are not backwards compatible with earlier versions of a deployed service, e.g. those that change topic names. Creek recommends always naming operators in the Kafka Streams DSL to minimise the chance of unintentional changes to internal topics. See the Kafka Streams docs for more information.

The test compares the topology with the last know topology and fails if they differ. If the change is intentional, then the handle-occurrence-service/src/test/resources/kafka/streams/expected_topology.txt file can be updated to reflect the latest topology. For this tutorial, the file should be updated to contain:

   Sub-topology: 0
    Source: ingest-twitter.tweet.text (topics: [twitter.tweet.text])
      --> extract-handles
    Processor: extract-handles (stores: [])
      --> egress-twitter.handle.usage
      <-- ingest-twitter.tweet.text
    Sink: egress-twitter.handle.usage (topic: twitter.handle.usage)
      <-- extract-handles

If you find this test more of a hindrance than a help… delete it! :smile:

Test Coverage

As before, test coverage can be calculated by running the following Gradle command:

./gradlew coverage

This will execute the unit and system tests and use JaCoCo to calculate the test coverage. The human-readable coverage report is saved at build/reports/jacoco/coverage/html/index.html.

Unit test coverage

In this case, the test coverage hasn’t improved by adding unit tests. Meaning, arguably, this test is superfluous.

While Creek recommends using system tests for functional testing, more complex real-world solutions often still benefit from unit testing of the topology, to cover branches that are hard to reach with system tests.

ProTip: The repository also has a Gradle task and GitHub workflow step to upload the coverage report to, should this be something you need, or the project can be customised to publish coverage reports elsewhere.