Kafka Streams tip: java.lang.ClassCastException
The Problem…
… if you get a java.lang.ClassCastException
in your Kafka Streams app which goes something like[B cannot be cast to java.lang.String
, just double check if you’ve specified key and value serializer and de-serializer in the configuration.
You can reproduce this with the code snippet below
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“application.id”, “testapp”);
props.put(“group.id”, “testgroup”);StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(“test-topic”);stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”);KafkaStreams app = new KafkaStreams(builder.build(), props);
app.start();
This happens because Kafka Streams does not know how to convert the raw bytes into String
(which is the type for the key as well as the value in the above example)
Possible solutions
The (simple) solution is to add the below configuration — this will serve as the default and works for String
keys and values. You’ll have to specify the serializer and de-serializer explicitly in case you want to deal with other data types e.g. Long
(or even in case of a custom Serde
)
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Another (not so convenient) option is to provide to specify the serializer and de-serializer explicitly for all the cases and not set the default configuration. For the above example, it looks something like this
KStream<String, String> stream = builder.stream(“test-topic”, Consumed.with(Serdes.String(), Serdes.String()));stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”, Produced.with(Serdes.String(), Serdes.String()));
Cheers!