Kafka Streams Interactive Queries

  • Metrics producer service — pushes machine metrics (simulated) to Kafka
  • Average Processor service — calculates the average of the stream of generated metrics and exposes REST APIs to query them
Photo by Mitchel Boot on Unsplash

Overview

Kakfa Streams

Kafka Streams DSL API

Kafka Streams state stores

Kafka Streams interactive queries

configurations.put("application.server", streamsAppServerConfig);
High level pverview

Metrics Producer service

...
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
for (int i = 1; i <= 5; i++) {
String key = KEY_PREFIX + i;
String value = String.valueOf(rnd.nextInt(20));
record = new ProducerRecord<>(TOPIC_NAME, key, value);
RecordMetadata rm = kafkaProducer.send(record).get();
}
...
ScheduledExecutorService kafkaProducerScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

//run producer after every 10 seconds
kafkaProducerScheduledExecutor.scheduleWithFixedDelay(new Producer(), 5, 10, TimeUnit.SECONDS);

Metrics Average Processor service

Kafka Streams DSL implementation for metrics average

KStream<String, String> metricsStream = builder.stream(SOURCE_TOPIC);

rest of the stuff..

static class MetricsCountAndSum { private final Long count;
private final Long sum;
public MetricsCountAndSum(Long count, Long sum) {
this.count = count;
this.sum = sum;
}
//ommitted for brevity...
...
String port = “8080”;
//Start Grizzly containerURI baseUri = UriBuilder.fromUri(“http://” + HOSTNAME + “/”).port(Integer.parseInt(port)).build();ResourceConfig config = new ResourceConfig(MetricsResource.class)
.register(MoxyJsonFeature.class); //to-from JSON (using JAXB)
HttpServer server = GrizzlyHttpServerFactory.createHttpServer(baseUri, config);
server.start();
GlobalAppState.getInstance()
.hostPortInfo(Utils.getHostIPForDiscovery(), port);
//Start Kafka Streams appKafkaStreams theStream = AverageMetricsService.startStream();GlobalAppState.getInstance().streams(theStream);
...

Metrics Query Service

...
@GET
@Path(“remote/{machine}”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote(@PathParam(“machine”) String machine) {
....
...
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response all_metrics() throws Exception {
Response response = null;
...
...
@GET

@Path(“remote”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote() {
...
  • An external HTTP client (curl, browser etc.) should be able to reach the Docker container — this is achieved using a random port (e.g. 32679) using -P in docker run and the host IP (which is localhost if you’re not running Docker in VirtualBox or something which might be something like 192.168.99.100 in that case). This means you that the URL will look something like http://localhost:32769/metrics. Alright, this is simple enough !
  • The Kafka Streams Docker containers should be able to talk to each other, since they need to access remote state stores data via HTTP calls (as explained above, the REST API implementation provides dedicated remote endpoints for this) — how this is made possible is explained below
configurations.put("application.server", streamsAppServerConfig);
public interface HostDiscovery {
public String getHost();

public static final HostDiscovery localHostDiscovery = () -> “localhost”;

public static final HostDiscovery dockerDiscovery = () -> {

System.out.println(“Docker based host discovery..”);
String dockerHostName = System.getenv(“HOSTNAME”);

System.out.println(“Docker container host name — “+ dockerHostName);
return dockerHostName;
};
}

Run the app…

git clone https://github.com/abhirockzz/kafka-streams-interactive-queries

Start by…

docker pull confluentinc/cp-zookeeperdocker pull confluentinc/cp-kafka

Start the core infra..

./kafka-start.sh//outputError: No such network: confluent
Error: No such container: kafka
Error: No such container: zookeeper
4e4388574e9401a271fa1f06b6c9500c85317d590aab71a8c0dc8ca939f0834d
48137112e1a85983b65eadb7228755ded2dcc0ce497aad8af333f8ec4d55be63
88c489dbbc13dbc58303530e3b210f2d4ac09222cb34b924f74446fc9c3b5c61
Created topic "cpu-metrics-topic".

Metrics producer service

./metrics-producer-service-start.sh
./consume-data.sh//output must be similar to...machine1 0
machine2 5
machine3 15
machine4 7
machine5 10
machine1 8
machine2 3
machine3 18
machine4 4
machine5 5
.....
Press Ctrl+C to stop...

Kafka Streams application …

./average-processing-service-start.sh
docker ps|grep ‘averageprocessorservice-*’|sed ‘s/.*0.0.0.0://g’|sed ‘s/->.*//g’
curl http://localhost:[port]/metrics/[machine]e.g. curl http://localhost:32769/metrics/machine1//you should get back a JSON{
"metrics": [
{
"machine": "machine1",
"cpu": "8.703125",
"source": "5691ab353dc4:8080"
}
]
}
  • machine — the name of machine
  • cpu — the current average metric reading of the machine
  • source — this is the hostname of the Docker container from where this reading was obtained
./average-processing-service-start.sh
curl http://localhost:[port]/metrics/e.g.curl http://localhost:32769/metrics///this too gives a JSON - this time with ALL machine metric readings{
"metrics": [
{
"machine": "machine4",
"cpu": "9.688311688311689",
"source": "815e0c107ef1:8080"
},
{
"machine": "machine1",
"cpu": "8.878048780487806",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine2",
"cpu": "8.39240506329114",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine3",
"cpu": "9.871794871794872",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine5",
"cpu": "9.291139240506329",
"source": "5691ab353dc4:8080"
}
]
}
  • the data for machine4 was sent to a different partition (as compared to machine1 etc.) and,
  • it was processed by a different instance of the average processing service

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Mastodon is dead in the water

Top 10 math functions in python

In C++, write a method called enumerate that returns the number of strings in the array that are…

Day 27 — Turnaround

Top 50 Free Indexable Profile Backlinks for 2022

Improving Time To First Byte and Web Vitals

My journey and honest tips as a self-taught Software Engineer from newbie to having full-time job…

Cyber Security For Beginners: Part 9

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Abhishek Gupta

Abhishek Gupta

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes

More from Medium

ROSETTA ERROR in starting Kafka Zookeeper on MAC M1

Kafka on Kubernetes

Kafka Transactions: Part 1: Exactly-Once Messaging

Design aspects of resilient event-driven applications using Apache Kafka.