Kafka Streams tip: java.lang.ClassCastException

Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“application.id”, “testapp”);
props.put(“group.id”, “testgroup”);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(“test-topic”);
stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”);
KafkaStreams app = new KafkaStreams(builder.build(), props);
app.start();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStream<String, String> stream = builder.stream(“test-topic”, Consumed.with(Serdes.String(), Serdes.String()));stream.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String value) {
return value.toUpperCase();
}
}).to(“output”, Produced.with(Serdes.String(), Serdes.String()));

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Abhishek Gupta

Abhishek Gupta

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes

More from Medium

Kafka Customer → Get Specific Messages Only

Kafka Topic Naming

In this blog, I am going to explain you how State store in Kafka streams is managed in…

What is Apache Kafka ?