Kafka Streams Interactive Queries
This blog post explores the Interactive Queries feature in Kafka Streams with help of a practical example. It covers the DSL API and how the state store information was exposed via a REST service
Everything is setup using Docker including Kafka, Zookeeper, the stream processing services as well as the producer app
Here are the key components
- Metrics producer service — pushes machine metrics (simulated) to Kafka
- Average Processor service — calculates the average of the stream of generated metrics and exposes REST APIs to query them
Overview
Let’s start with a high level overview — the intention is to introduce basic concepts and most of this is from the Kafka documentation
Kakfa Streams
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It’s designed as a simple and lightweight client library, which can be easily embedded in any Java application. It has no external dependencies on systems other than Kafka itself and it’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees. It has support for Supports fault-tolerant local state, employs one-record-at-a-time processing to achieve millisecond processing latency and offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API
Kafka Streams DSL API
The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.
Kafka Streams state stores
Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. Every stream task in a Kafka Streams application may embed one or more local state stores that can be accessed via APIs to store and query data required for processing. Kafka Streams offers fault-tolerance and automatic recovery for such local state stores.
Kafka Streams interactive queries
Kafka Streams enables your applications to be queryable. Interactive queries allow you to leverage the state of your application from outside your application. The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.
To enable remote state store discovery in a distributed Kafka Streams application, you must set the configuration property in the config properties. The application.server
property defines a unique host:port
pair that points to the RPC endpoint of the respective instance of a Kafka Streams application
configurations.put("application.server", streamsAppServerConfig);
To query the full state of your application, you should be able to combine the results in the local state store as well as reach out the to state stores of remote instances (via RPC etc.)
Here is a bird’s eye view of the overall solution looks
Lets skim through the implementation at a high level.. full code is available on GitHub
Metrics Producer service
It’s a simple producer which simulates random machine metric data and pushes them to a Kafka topic. It generates metrics for five machines i.e. machine1
to machine5
— the machine names (machine1
etc.) are used as keys and the actual metric (1,2, 14 etc.) are used as the values
...
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);for (int i = 1; i <= 5; i++) {
String key = KEY_PREFIX + i;
String value = String.valueOf(rnd.nextInt(20));
record = new ProducerRecord<>(TOPIC_NAME, key, value); RecordMetadata rm = kafkaProducer.send(record).get();
}
...
The process has been made synchronous (blocking call) on purpose (for this example) —this is done by invoking get
on the Future<RecordMetadata>
object returned by the send
call to KafkaProducer
. It is modelled as a Runnable
and is scheduled using a ScheduledExecutorService
(after every 10 seconds)
ScheduledExecutorService kafkaProducerScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
//run producer after every 10 secondskafkaProducerScheduledExecutor.scheduleWithFixedDelay(new Producer(), 5, 10, TimeUnit.SECONDS);
Metrics Average Processor service
This is a Kafka Streams application which uses the DSL API to churn the average metric reading of each machine. In addition to the Kafka Streams components, it also exposes a REST API using JAX-RS (Jersey as the implementation) to query the metrics average info (details to follow)
Kafka Streams DSL implementation for metrics average
The implementation depends on aggregation to get the job done
I thought of implementing this as a cumulative moving average using the Kafka Streams Processor API, but got a hint thanks to this — https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
It starts by reading the raw metrics from the cpu-metrics-topic
KStream<String, String> metricsStream = builder.stream(SOURCE_TOPIC);
Then it groups the records by key i.e. machine name — metricsStream.groupByKey()
This is followed by the aggregation where a MetricsCountAndSum
(POJO) object is used to represent the count and sum
Note that it uses a custom Serde
(serializer and de-serializer) to handle how the MetricsCountAndSum
object is persisted to and read from Kafka
The aggregated value is then converted to average
. It uses mapValues to convert MetricsCountAndSum
into the average
reading using sum/count
. The machine and their average is saved to a state store average-metrics-store
— note the custom Serde
(Serdes.Double()
) since the average is of type java.lang.Double
rest of the stuff..
.. here is the POJO,
static class MetricsCountAndSum { private final Long count;
private final Long sum; public MetricsCountAndSum(Long count, Long sum) {
this.count = count;
this.sum = sum;
}//ommitted for brevity...
and the Serde
implementation
.. and finally, everything is tied together and bootstrapped in KafkaStreamsAppBootstrap
class, which starts up the JAX-RS (Grizzly container) and the Kafka Streams application
...
String port = “8080”;//Start Grizzly containerURI baseUri = UriBuilder.fromUri(“http://” + HOSTNAME + “/”).port(Integer.parseInt(port)).build();ResourceConfig config = new ResourceConfig(MetricsResource.class)
.register(MoxyJsonFeature.class); //to-from JSON (using JAXB)HttpServer server = GrizzlyHttpServerFactory.createHttpServer(baseUri, config);
server.start();GlobalAppState.getInstance()
.hostPortInfo(Utils.getHostIPForDiscovery(), port);//Start Kafka Streams appKafkaStreams theStream = AverageMetricsService.startStream();GlobalAppState.getInstance().streams(theStream);
...
Metrics Query Service
The information in the state stores is available only within the JVM in which the streams app is running. The average processing service exposes REST APIs for external clients to be able to query the metrics e.g. to query metrics of machine1
, you can execute curl http://host:port/metrics/machine1
or use curl http://host:port/metrics
to get metrics for all the machines
MetricsResource
class is where the bulk of the implementation resides. To query metric for a specific machine, the metadataForKey
method is used — it’s a simple key-value lookup where the name of KV store a.k.a state store is passed in
The interesting thing to note is that It makes it possible to query for the metric details and irrespective of whether the key (machine name) exists in the state store in that JVM — if it does not, a call is made to a remote (REST) endpoint. This is made possible since the StreamsMetadata
object provides the host
and port
information associated with the key, which is then used to make the call to the remote HTTP endpoint of another stream processing app instance
...
@GET
@Path(“remote/{machine}”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote(@PathParam(“machine”) String machine) {
....
Similarly, you also get endpoints to query average metrics for all the machines
...
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response all_metrics() throws Exception {
Response response = null;
...
here is the remote counterpart of the above
...
@GET
@Path(“remote”)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Metrics remote() {
...
The workhorse methods which are invoked by the above mentioned endpoint implementations are as follows
getLocalMetrics(String machine)
— for a single machine’s metric
getLocalMetrics
— for all metrics
Finally, the objects which are returned by the endpoints are simple POJOs which are sent back as JSON (or XML if needed)
List of metrics
But, how do the Docker containers communicate ?
Each of the Kafka Streams app is running in a Docker container. For Interactive Queries to work, there are two key requirements
- An external HTTP client (
curl
, browser etc.) should be able to reach the Docker container — this is achieved using a random port (e.g.32679
) using-P
indocker run
and the host IP (which islocalhost
if you’re not running Docker in VirtualBox or something which might be something like192.168.99.100
in that case). This means you that the URL will look something likehttp://localhost:32769/metrics
. Alright, this is simple enough ! - The Kafka Streams Docker containers should be able to talk to each other, since they need to access remote state stores data via
HTTP
calls (as explained above, the REST API implementation provides dedicatedremote
endpoints for this) — how this is made possible is explained below
Each Kafka Streams app accepts the RPC host and port in as a part of its configuration e.g.
configurations.put("application.server", streamsAppServerConfig);
In this case, streamsAppServerConfig is derived from the HOSTNAME of the Docker container in which it is running. This is part of the HostDiscovery class
public interface HostDiscovery {
public String getHost();
public static final HostDiscovery localHostDiscovery = () -> “localhost”;
public static final HostDiscovery dockerDiscovery = () -> {
System.out.println(“Docker based host discovery..”);
String dockerHostName = System.getenv(“HOSTNAME”);
System.out.println(“Docker container host name — “+ dockerHostName);
return dockerHostName;
};}
Notice the dockerDiscovery implementation which reads from the HOSTNAME
environment variable inside the Docker container. As far as the port is concerned, this can be 8080
since thats what is defined in the code as well as the Dockerfile (using EXPOSE
) — this is not a problem since this is inside the container.
To put this all together, the Kafka Streams app config has a reachable endpoint e.g. 5691ab353dc4:8080
which the other instance(s) can invoke over HTTP to query for remote state store data.
Don’t worry if all this does not sink in at once — the next section, where you will test the end to end scenario, will help clarify things. So, let’s move on and run the service….
Run the app…
Clone the repo first
git clone https://github.com/abhirockzz/kafka-streams-interactive-queries
Start by…
pull
ing Confluent Zookeeper and Kafka Docker images
docker pull confluentinc/cp-zookeeperdocker pull confluentinc/cp-kafka
Start the core infra..
i.e. Kafka cluster (along with Zookeeper of course)
./kafka-start.sh//outputError: No such network: confluent
Error: No such container: kafka
Error: No such container: zookeeper
4e4388574e9401a271fa1f06b6c9500c85317d590aab71a8c0dc8ca939f0834d
48137112e1a85983b65eadb7228755ded2dcc0ce497aad8af333f8ec4d55be63
88c489dbbc13dbc58303530e3b210f2d4ac09222cb34b924f74446fc9c3b5c61
Created topic "cpu-metrics-topic".
you can safely ignore the Error:
messages
This will start Zookeeper, Kafka along and create topic cpu-metrics-topic
with 2 partitions
Metrics producer service
Start this to initiate simulated metric data being pushed to cpu-metrics-topic
topic
./metrics-producer-service-start.sh
Sanity test
Use a Kafka Console Consumer to test if the producer is working as expected
./consume-data.sh//output must be similar to...machine1 0
machine2 5
machine3 15
machine4 7
machine5 10
machine1 8
machine2 3
machine3 18
machine4 4
machine5 5
.....Press Ctrl+C to stop...
Ok, the first half is setup. Let’s move on the …
Kafka Streams application …
… which will crunch these metrics
Start a single instance of average processor service
./average-processing-service-start.sh
and extract the port it’s running on
docker ps|grep ‘averageprocessorservice-*’|sed ‘s/.*0.0.0.0://g’|sed ‘s/->.*//g’
this will give back the random port which docker allocated for the service e.g. 32769
Query time!
Now you can use the HTTP endpoint exposed by the service to check the average metric reading from machines
To get the reading for a specific machine
curl http://localhost:[port]/metrics/[machine]e.g. curl http://localhost:32769/metrics/machine1//you should get back a JSON{
"metrics": [
{
"machine": "machine1",
"cpu": "8.703125",
"source": "5691ab353dc4:8080"
}
]
}
Although its obvious, here is what the JSON attributes represent
machine
— the name of machinecpu
— the current average metric reading of the machinesource
— this is the hostname of the Docker container from where this reading was obtained
Before we proceed, start another instance of the average processing service — since the cpu-metrics-topic
topic has two partitions, the processing will be divided amongst the two instances
./average-processing-service-start.sh
The metrics producer service is churning along.. so just wait for a while (15–30 seconds) for data to get re-distributed
Now, query the average readings for all machines
curl http://localhost:[port]/metrics/e.g.curl http://localhost:32769/metrics///this too gives a JSON - this time with ALL machine metric readings{
"metrics": [
{
"machine": "machine4",
"cpu": "9.688311688311689",
"source": "815e0c107ef1:8080"
},
{
"machine": "machine1",
"cpu": "8.878048780487806",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine2",
"cpu": "8.39240506329114",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine3",
"cpu": "9.871794871794872",
"source": "5691ab353dc4:8080"
},
{
"machine": "machine5",
"cpu": "9.291139240506329",
"source": "5691ab353dc4:8080"
}
]
}
Notice the reading for machine4
— its source
is different that of other machines. This is because
- the data for
machine4
was sent to a different partition (as compared tomachine1
etc.) and, - it was processed by a different instance of the average processing service
Hence the different source
value represented by the docker container hostname of the stream processing service instance.
You can also try the port number for the second service instance — you should see the same result
As you saw, Kafka Streams Interactive Queries is a flexible and scalable way to expose your application state/data. Although this was a relatively simple example, this is really valuable in situations where you need to tap into your service data when working with multiple systems e.g. joining information from your purchases and users
That’s it for this post. Stay tuned for more….
Cheers!