Learn Stream Processing With Kafka Streams

Stateless operations

Abhishek Gupta
Better Programming
Published in
6 min readMar 5, 2020

--

Photo by Joao Branco on Unsplash

Kafka Streams is a Java library for developing stream-processing applications on top of Apache Kafka. This is the first in a series of articles on Kafka Streams and its APIs.

This is not a theoretical guide about Kafka Streams (although I’ve covered some of those aspects in the past).

In this part, we’ll cover stateless operations in the Kafka Streams DSL API — specifically, the functions available in KStream, such as filter, map, groupBy, etc. The DSL API in Kafka Streams offers a powerful, functional-style programming model to define stream-processing topologies.

The APIs (KStream, etc.) referenced in this post can be found in the Kafka Streams Javadocs

The Setup

To start things, you need to create a KafkaStreams instance. It needs a topology and related configuration (in the form of java.util.Properties).

Set the required configuration for your Kafka streams app:

Properties config = new Properties();config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

We can then build a topology that defines the processing pipeline (the rest of this article will focus on the stateless parts of a topology).

You can create the KafkaStreams instance and start processing:

KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever

Stateless Operations Using KStream

I generally like categorizing things into buckets — it helps me divide and conquer. I’ve tried the same in this case by dividing various KStream operations into filter, map, etc.

Let’s dig in!

filter

You can use filter to omit or include records based on your criteria.

For example, if the value sent to a topic contains a word and you want to include words greater than a specified length, you can define this criteria using a Predicate and pass it to the filter method — this will create a new KStream instance with the filtered records.

KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.length() > 5;
}
})

It’s also possible to use filterNot if you want to exclude records based on certain criteria. Here is a lambda-style example:

KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));

map

A commonly used stateless operation is map. In the case of Kafka Streams, it can be used to transform each record in the input KStream by applying a mapper function.

This is available in multiple flavors, such as map, mapValues, flatMap, and flatMapValues.

Simply use the map method if you want to alter both the key and the value. For example, to convert the key and the value to uppercase.

stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
}
});

Use mapValues if all you want to alter is the value:

stream.mapValues(value -> value.toUpperCase());

flatMap is similar to map, but it allows you to return multiple records (KeyValues):

stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
})

In the above example, each record in the stream gets flatMapped such that each comma-separated value (CSV) is first split into its constituents, and a KeyValue pair is created for each part of the CSV string. For example, if you have the records (foo <-> a,b,c) and (bar <-> d,e) (where foo and bar are keys), the resulting stream will have five entries: (foo,a), (foo,b), (foo,c), (bar,d), and (bar,e).

Use flatMapValues if you only want to accept a value from the stream and return a collection of values

group

If you want to perform stateful aggregations on the contents of a KStream, you’ll first need to group its records by their key to create a KGroupedStream.

We’ll cover stateful operations on KGroupedStream in subsequent articles in this series.

Here’s an example of how you can do this using groupByKey:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);

KGroupedStream<String,String> kgs = stream.groupByKey();

A generalized version of groupByKey is groupBy, which gives you the ability to group based on a different key using a KeyValueMapper.

stream.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
});

In both cases (groupByKey and groupBy), if you need to use a different Serde (Serializer and Deserializer) instead of the default ones, use the overloaded version (which accepts a Grouped object).

stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));

Terminal Operations

A terminal operation in Kafka Streams is a method that returns void instead of an intermediate, such as another KStream or KTable.

You can use the to method to store the records of a KStream to a topic in Kafka.

KStream<String, String> stream = builder.stream("words");stream.mapValues(value -> value.toUpperCase())
.to("uppercase-words");

An overloaded version of to allows you to specify a Produced object to customize the Serdes and the partitioner.

stream.mapValues(value -> value.toUpperCase())
.to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));

Instead of specifying a static topic name, you can make use of a TopicNameExtractor and include any custom logic to choose a specific topic in a dynamic fashion

stream.mapValues(value -> value.toUpperCase())
.to(new TopicNameExtractor<String, String>() {
@Override
public String extract(String k, String v, RecordContext rc) {
return rc.topic()+"_uppercase";
}
});

In this example, we make use of the RecordContext, which contains the metadata of the record, to get the topic and then append _uppercase to it.

In all of the above cases, the sink topic should pre-exist in Kafka.

If you want to log the KStream records (for debugging purposes), use the print method. It accepts an instance of Printed to configure the behavior.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());

This will print out the records — e.g., if you pass in (foo, BAR) and (john, DOE) to the input topic, they’ll get converted to uppercase and logged as such:

[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE

You can also use Printed.toFile (instead of toSysOut) to target a specific file.

The foreach method is similar to print and peek:

  • It’s also a terminal operation (like print)
  • And it accepts a ForeachAction (like peek)

Miscellaneous

Since the print method is a terminal operation, you have the option of using peek, which returns the same KStream instance. It accepts a ForeachAction, which can be used to specify what you want to do for each record — e.g., log the key and the value.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.peek((k,v) -> System.out.println("key="+k+", value="+v))
.to(OUTPUT_TOPIC);

In the above example, you’ll be able to see the key and values being logged, and they’ll also be materialized to the output topic (unlike the print operation).

branch is a method I haven’t used (to be honest), but it looks quite interesting. It gives you the ability evaluate every record in a KStream against multiple criteria (represented by a Predicate) and output multiple (an array of) KStreams. The key here is you can use multiple predicates instead of a single one — as is the case with filter and filterNot.

You can merge two KStreams together into a single one.

StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
stream1.merge(stream2).to("output-topic");

Note: The resulting stream may not have all the records in order.

If you want to derive a new key (it can have a different type as well) for each record in your KStream, use the selectKey method, which accepts a KeyValueMapper. selectKey is similar to map, but the difference is that map restricts the return type to a KeyValue object.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
})

While developing your processing pipelines with Kafka Streams DSL, you’ll find yourself pushing resulting stream records to an output topic using to and then creating a new stream from that (output) topic:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);
//output topic now becomes the input source
KStream<String, String> stream2 = builder.stream(OUTPUT_TOPIC);
//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);

This can be simplified by using the through method. So you can rewrite the above as follows:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.through(OUTPUT_TOPIC)
.filter((k,v) -> v.length > 5)
.to(LENGTHY_WORDS_TOPIC);

Here, we materialize the records (with upper-case values) to an intermediate topic and continue processing (using filter, in this case) and finally store postfiltration results in another topic.

That’s it for now. Stay tuned for upcoming articles in this series!

References

Please don’t forget to check out the following resources for Kafka Streams.

--

--

Abhishek Gupta
Abhishek Gupta

Written by Abhishek Gupta

Principal Product Manager at Microsoft | I ❤️ Databases, Go, Kubernetes

Responses (1)

What are your thoughts?