Learn Stream Processing With Kafka Streams
Stateless operations

Kafka Streams is a Java library for developing stream-processing applications on top of Apache Kafka. This is the first in a series of articles on Kafka Streams and its APIs.
This is not a theoretical guide about Kafka Streams (although I’ve covered some of those aspects in the past).
In this part, we’ll cover stateless operations in the Kafka Streams DSL API — specifically, the functions available in KStream
, such as filter
, map
, groupBy
, etc. The DSL API in Kafka Streams offers a powerful, functional-style programming model to define stream-processing topologies.
The APIs (KStream
, etc.) referenced in this post can be found in the Kafka Streams Javadocs
The Setup
To start things, you need to create a KafkaStreams
instance. It needs a topology
and related configuration (in the form of java.util.Properties
).
Set the required configuration for your Kafka streams app:
Properties config = new Properties();config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
We can then build a topology
that defines the processing pipeline (the rest of this article will focus on the stateless parts of a topology).
You can create the KafkaStreams
instance and start processing:
KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever
Stateless Operations Using KStream
I generally like categorizing things into buckets — it helps me divide and conquer. I’ve tried the same in this case by dividing various KStream
operations into filter
, map
, etc.
Let’s dig in!
filter
You can use filter
to omit or include records based on your criteria.
For example, if the value sent to a topic contains a word and you want to include words greater than a specified length, you can define this criteria using a Predicate
and pass it to the filter
method — this will create a new KStream
instance with the filtered records.
KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.length() > 5;
}
})
It’s also possible to use filterNot
if you want to exclude
records based on certain criteria. Here is a lambda-style example:
KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));
map
A commonly used stateless operation is map
. In the case of Kafka Streams, it can be used to transform each record in the input KStream
by applying a mapper function.
This is available in multiple flavors, such as map
, mapValues
, flatMap
, and flatMapValues
.
Simply use the map
method if you want to alter both the key and the value. For example, to convert the key and the value to uppercase.
stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
}
});
Use mapValues
if all you want to alter is the value:
stream.mapValues(value -> value.toUpperCase());
flatMap
is similar to map
, but it allows you to return multiple records (KeyValue
s):
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
})
In the above example, each record in the stream gets flatMap
ped such that each comma-separated value (CSV) is first split into its constituents, and a KeyValue
pair is created for each part of the CSV string. For example, if you have the records (foo <-> a,b,c)
and (bar <-> d,e)
(where foo
and bar
are keys), the resulting stream will have five entries: (foo,a)
, (foo,b)
, (foo,c)
, (bar,d)
, and (bar,e)
.
Use flatMapValues
if you only want to accept a value from the stream and return a collection of values
group
If you want to perform stateful aggregations on the contents of a KStream
, you’ll first need to group its records by their key to create a KGroupedStream
.
We’ll cover stateful operations on KGroupedStream
in subsequent articles in this series.
Here’s an example of how you can do this using groupByKey
:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KGroupedStream<String,String> kgs = stream.groupByKey();
A generalized version of groupByKey
is groupBy
, which gives you the ability to group based on a different key using a KeyValueMapper
.
stream.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
});
In both cases (groupByKey
and groupBy
), if you need to use a different Serde
(Serializer
and Deserializer
) instead of the default ones, use the overloaded version (which accepts a Grouped
object).
stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));
Terminal Operations
A terminal operation in Kafka Streams is a method that returns void
instead of an intermediate, such as another KStream
or KTable
.
You can use the to
method to store the records of a KStream
to a topic in Kafka.
KStream<String, String> stream = builder.stream("words");stream.mapValues(value -> value.toUpperCase())
.to("uppercase-words");
An overloaded version of to
allows you to specify a Produced
object to customize the Serdes
and the partitioner.
stream.mapValues(value -> value.toUpperCase())
.to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));
Instead of specifying a static topic name, you can make use of a TopicNameExtractor
and include any custom logic to choose a specific topic in a dynamic fashion
stream.mapValues(value -> value.toUpperCase())
.to(new TopicNameExtractor<String, String>() {
@Override
public String extract(String k, String v, RecordContext rc) {
return rc.topic()+"_uppercase";
}
});
In this example, we make use of the RecordContext
, which contains the metadata of the record, to get the topic and then append _uppercase
to it.
In all of the above cases, the sink topic should pre-exist in Kafka.
If you want to log the KStream
records (for debugging purposes), use the print
method. It accepts an instance of Printed
to configure the behavior.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());
This will print out the records — e.g., if you pass in (foo, BAR)
and (john, DOE)
to the input topic, they’ll get converted to uppercase and logged as such:
[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE
You can also use Printed.toFile
(instead of toSysOut
) to target a specific file.
The foreach
method is similar to print
and peek
:
- It’s also a terminal operation (like
print
) - And it accepts a
ForeachAction
(likepeek
)
Miscellaneous
Since the print
method is a terminal operation, you have the option of using peek
, which returns the same KStream
instance. It accepts a ForeachAction
, which can be used to specify what you want to do for each record — e.g., log the key and the value.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);stream.mapValues(v -> v.toUpperCase())
.peek((k,v) -> System.out.println("key="+k+", value="+v))
.to(OUTPUT_TOPIC);
In the above example, you’ll be able to see the key and values being logged, and they’ll also be materialized to the output topic (unlike the print
operation).
branch
is a method I haven’t used (to be honest), but it looks quite interesting. It gives you the ability evaluate every record in a KStream
against multiple criteria (represented by a Predicate
) and output multiple (an array of) KStream
s. The key here is you can use multiple predicates instead of a single one — as is the case with filter
and filterNot
.
You can merge
two KStream
s together into a single one.
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");stream1.merge(stream2).to("output-topic");
Note: The resulting stream may not have all the records in order.
If you want to derive a new key (it can have a different type as well) for each record in your KStream
, use the selectKey
method, which accepts a KeyValueMapper
. selectKey
is similar to map
, but the difference is that map
restricts the return type to a KeyValue
object.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);stream.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
})
While developing your processing pipelines with Kafka Streams DSL, you’ll find yourself pushing resulting stream records to an output topic using to
and then creating a new stream from that (output) topic:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);//output topic now becomes the input source
KStream<String, String> stream2 = builder.stream(OUTPUT_TOPIC);//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);
This can be simplified by using the through
method. So you can rewrite the above as follows:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);stream.mapValues(v -> v.toUpperCase())
.through(OUTPUT_TOPIC)
.filter((k,v) -> v.length > 5)
.to(LENGTHY_WORDS_TOPIC);
Here, we materialize the records (with upper-case values) to an intermediate topic and continue processing (using filter
, in this case) and finally store postfiltration results in another topic.
That’s it for now. Stay tuned for upcoming articles in this series!
References
Please don’t forget to check out the following resources for Kafka Streams.