Kafka Streams tip: java.lang.ClassCastException

Abhishek Gupta
1 min readJan 16, 2019

--

The Problem…

… if you get a java.lang.ClassCastException in your Kafka Streams app which goes something like[B cannot be cast to java.lang.String , just double check if you’ve specified key and value serializer and de-serializer in the configuration.

You can reproduce this with the code snippet below

Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“application.id”, “testapp”);
props.put(“group.id”, “testgroup”);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(“test-topic”);
stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”);
KafkaStreams app = new KafkaStreams(builder.build(), props);
app.start();

This happens because Kafka Streams does not know how to convert the raw bytes into String (which is the type for the key as well as the value in the above example)

Possible solutions

The (simple) solution is to add the below configuration — this will serve as the default and works for String keys and values. You’ll have to specify the serializer and de-serializer explicitly in case you want to deal with other data types e.g. Long (or even in case of a custom Serde)

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Another (not so convenient) option is to provide to specify the serializer and de-serializer explicitly for all the cases and not set the default configuration. For the above example, it looks something like this

KStream<String, String> stream = builder.stream(“test-topic”, Consumed.with(Serdes.String(), Serdes.String()));stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”, Produced.with(Serdes.String(), Serdes.String()));

Cheers!

--

--

Abhishek Gupta

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes