Kafka Streams tip: java.lang.ClassCastException

Abhishek Gupta
1 min readJan 16, 2019

The Problem…

… if you get a java.lang.ClassCastException in your Kafka Streams app which goes something like[B cannot be cast to java.lang.String , just double check if you’ve specified key and value serializer and de-serializer in the configuration.

You can reproduce this with the code snippet below

Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“application.id”, “testapp”);
props.put(“group.id”, “testgroup”);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(“test-topic”);
stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”);
KafkaStreams app = new KafkaStreams(builder.build(), props);
app.start();

This happens because Kafka Streams does not know how to convert the raw bytes into String (which is the type for the key as well as the value in the above example)

Possible solutions

The (simple) solution is to add the below configuration — this will serve as the default and works for String keys and values. You’ll have to specify the serializer and de-serializer explicitly in case you want to deal with other data types e.g. Long (or even in case of a custom Serde)

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Another (not so convenient) option is to provide to specify the serializer and de-serializer explicitly for all the cases and not set the default configuration. For the above example, it looks something like this

KStream<String, String> stream = builder.stream(“test-topic”, Consumed.with(Serdes.String(), Serdes.String()));stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”, Produced.with(Serdes.String(), Serdes.String()));

Cheers!

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Abhishek Gupta
Abhishek Gupta

Written by Abhishek Gupta

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes

Responses (1)

What are your thoughts?

If you get a java.lang.ClassCastException I would also recommend try to increase kafka-avro-serializer dependency version. I remember that it worked one time for me when I had issue with Utf8 to UUID convertion.