Kafka Streams Interactive Queries

  • Metrics producer service — pushes machine metrics (simulated) to Kafka
  • Average Processor service — calculates the average of the stream of generated metrics and exposes REST APIs to query them
Photo by Mitchel Boot on Unsplash

Overview

Let’s start with a high level overview — the intention is to introduce basic concepts and most of this is from the Kafka documentation

Kakfa Streams

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It’s designed as a simple and lightweight client library, which can be easily embedded in any Java application. It has no external dependencies on systems other than Kafka itself and it’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees. It has support for Supports fault-tolerant local state, employs one-record-at-a-time processing to achieve millisecond processing latency and offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API

Kafka Streams DSL API

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

Kafka Streams state stores

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. Every stream task in a Kafka Streams application may embed one or more local state stores that can be accessed via APIs to store and query data required for processing. Kafka Streams offers fault-tolerance and automatic recovery for such local state stores.

Kafka Streams interactive queries

Kafka Streams enables your applications to be queryable. Interactive queries allow you to leverage the state of your application from outside your application. The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.

configurations.put("application.server", streamsAppServerConfig);
High level pverview

Metrics Producer service

It’s a simple producer which simulates random machine metric data and pushes them to a Kafka topic. It generates metrics for five machines i.e. machine1 to machine5 — the machine names (machine1 etc.) are used as keys and the actual metric (1,2, 14 etc.) are used as the values

...
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
for (int i = 1; i <= 5; i++) {
String key = KEY_PREFIX + i;
String value = String.valueOf(rnd.nextInt(20));
record = new ProducerRecord<>(TOPIC_NAME, key, value);
RecordMetadata rm = kafkaProducer.send(record).get();
}
...
ScheduledExecutorService kafkaProducerScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

//run producer after every 10 seconds
kafkaProducerScheduledExecutor.scheduleWithFixedDelay(new Producer(), 5, 10, TimeUnit.SECONDS);

Metrics Average Processor service

This is a Kafka Streams application which uses the DSL API to churn the average metric reading of each machine. In addition to the Kafka Streams components, it also exposes a REST API using JAX-RS (Jersey as the implementation) to query the metrics average info (details to follow)

Kafka Streams DSL implementation for metrics average

The implementation depends on aggregation to get the job done

KStream<String, String> metricsStream = builder.stream(SOURCE_TOPIC);

rest of the stuff..

.. here is the POJO,

static class MetricsCountAndSum { private final Long count;
private final Long sum;
public MetricsCountAndSum(Long count, Long sum) {
this.count = count;
this.sum = sum;
}
//ommitted for brevity...
...
String port = “8080”;
//Start Grizzly containerURI baseUri = UriBuilder.fromUri(“http://” + HOSTNAME + “/”).port(Integer.parseInt(port)).build();ResourceConfig config = new ResourceConfig(MetricsResource.class)
.register(MoxyJsonFeature.class); //to-from JSON (using JAXB)
HttpServer server = GrizzlyHttpServerFactory.createHttpServer(baseUri, config);
server.start();
GlobalAppState.getInstance()
.hostPortInfo(Utils.getHostIPForDiscovery(), port);
//Start Kafka Streams appKafkaStreams theStream = AverageMetricsService.startStream();GlobalAppState.getInstance().streams(theStream);
...

Metrics Query Service

The information in the state stores is available only within the JVM in which the streams app is running. The average processing service exposes REST APIs for external clients to be able to query the metrics e.g. to query metrics of machine1, you can execute curl http://host:port/metrics/machine1 or use curl http://host:port/metrics to get metrics for all the machines

...
@GET
@Path(“remote/{machine}”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote(@PathParam(“machine”) String machine) {
....
...
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response all_metrics() throws Exception {
Response response = null;
...
...
@GET

@Path(“remote”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote() {
...
  • An external HTTP client (curl, browser etc.) should be able to reach the Docker container — this is achieved using a random port (e.g. 32679) using -P in docker run and the host IP (which is localhost if you’re not running Docker in VirtualBox or something which might be something like 192.168.99.100 in that case). This means you that the URL will look something like http://localhost:32769/metrics. Alright, this is simple enough !
  • The Kafka Streams Docker containers should be able to talk to each other, since they need to access remote state stores data via HTTP calls (as explained above, the REST API implementation provides dedicated remote endpoints for this) — how this is made possible is explained below
configurations.put("application.server", streamsAppServerConfig);
public interface HostDiscovery {
public String getHost();

public static final HostDiscovery localHostDiscovery = () -> “localhost”;

public static final HostDiscovery dockerDiscovery = () -> {

System.out.println(“Docker based host discovery..”);
String dockerHostName = System.getenv(“HOSTNAME”);

System.out.println(“Docker container host name — “+ dockerHostName);
return dockerHostName;
};
}

Run the app…

Clone the repo first

git clone https://github.com/abhirockzz/kafka-streams-interactive-queries

Start by…

pulling Confluent Zookeeper and Kafka Docker images

docker pull confluentinc/cp-zookeeperdocker pull confluentinc/cp-kafka

Start the core infra..

i.e. Kafka cluster (along with Zookeeper of course)

./kafka-start.sh//outputError: No such network: confluent
Error: No such container: kafka
Error: No such container: zookeeper
4e4388574e9401a271fa1f06b6c9500c85317d590aab71a8c0dc8ca939f0834d
48137112e1a85983b65eadb7228755ded2dcc0ce497aad8af333f8ec4d55be63
88c489dbbc13dbc58303530e3b210f2d4ac09222cb34b924f74446fc9c3b5c61
Created topic "cpu-metrics-topic".

Metrics producer service

Start this to initiate simulated metric data being pushed to cpu-metrics-topic topic

./metrics-producer-service-start.sh
./consume-data.sh//output must be similar to...machine1 0
machine2 5
machine3 15
machine4 7
machine5 10
machine1 8
machine2 3
machine3 18
machine4 4
machine5 5
.....
Press Ctrl+C to stop...

Kafka Streams application …

… which will crunch these metrics

./average-processing-service-start.sh
docker ps|grep ‘averageprocessorservice-*’|sed ‘s/.*0.0.0.0://g’|sed ‘s/->.*//g’
curl http://localhost:[port]/metrics/[machine]e.g. curl http://localhost:32769/metrics/machine1//you should get back a JSON{
"metrics": [
{
"machine": "machine1",
"cpu": "8.703125",
"source": "5691ab353dc4:8080"
}
]
}
  • machine — the name of machine
  • cpu — the current average metric reading of the machine
  • source — this is the hostname of the Docker container from where this reading was obtained
./average-processing-service-start.sh
curl http://localhost:[port]/metrics/e.g.curl http://localhost:32769/metrics///this too gives a JSON - this time with ALL machine metric readings{
"metrics": [
{
"machine": "machine4",
"cpu": "9.688311688311689",
"source": "815e0c107ef1:8080"
},
{
"machine": "machine1",
"cpu": "8.878048780487806",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine2",
"cpu": "8.39240506329114",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine3",
"cpu": "9.871794871794872",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine5",
"cpu": "9.291139240506329",
"source": "5691ab353dc4:8080"
}
]
}
  • the data for machine4 was sent to a different partition (as compared to machine1 etc.) and,
  • it was processed by a different instance of the average processing service

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Abhishek Gupta

Abhishek Gupta

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes