In this article we will describe a solution for generating dynamic Kafka streams that we created for a client in the Investment Banking sector. The starting point for our client is that they receive real time prices for financial instruments from market data providers (e.g. Reuters, Bloomberg, etc) via Kafka topics. The prices from each provider are flowing into separate Kafka topics per instrument type meaning we are dealing with several different topics for each provider. For example, there is a topic with commodity prices where messages in this topic hold an instrument name as a key (for example GOLD or GAS or OIL to simplify), and a value, which is a price. Another topic represents American stock market and will have messages like AAPL: 123.4501, GOOG: 543.2105 and other stock prices in real time.
We were asked to come up with a way to dynamically define, compare and calculate prices of different instruments to monitor trends as the prices change in real time. For example, upon receiving a request to add prices of AAPL and GOOG together, the app should create a new output topic and publish new result of that calculation each time the price of any of those two instruments change.
The Apache Kafka Stream API offers creation of dynamic of topologies that we could use to model the requested calculations as they come and maintain their lifecycle to keep producing results as we get updated instrument price data.
Apache Kafka is a high-throughput low-latency platform for handling real-time data. As such it is a perfect fit for monitoring prices that change frequently, such as those used in automated trading systems that require minimal latency to take swift actions.
Kafka uses consumer groups to indicate which consumers can work together to consume data from some topics. Partitions of the topics are distributed between consumers within a consumer group. It is worth noting that Kafka distributes partitions, not messages, which means that messages from the same partition will always go to the same node. Our scenario does not let us distribute the ingestion of data easily, because all prices are needed for dynamic price calculations. Each node has a separate consumer group, and being its sole consumer the node will receive messages from all of its partitions.
When considering the price calculations, the distribution of data is easier than for ingestion and we can do this without limitation to any particular node. The calculations are independent and can be performed by any node in the group, and so calculations are created dynamically and can be assigned to any node.
The Kafka Stream API allows us to perform operations on Kafka streams that are particularly helpful for our use case – namely joins and windowing. We use these operations to combine data from different streams. The Kafka Streams DSL also provides a convenient notation for operations on elements of the streams such as map, filter and reduce. Performance when using the Streams DSL and API is the same as using a listener and doing the computation in Java, as the Streams API is built on top of Kafka producer and consumer, but we have the advantage of using a clear syntax of the DSL. This gives the benefit of having all data transformation logic in one place, keeping things easy to read and understand.
The Kafka Streams DSL also provides abstractions over streams and tables. To make an operation on only selected instruments we have to use stateless operations like filtering. In order to deal with the most recent price of the instruments we convert a stream to a table, meaning we now deal with the most recent state, e.g. the latest prices for each instrument from the stream. Finally, we use a "reducer" aggregation, a stateful transformation, to combine prices.
All of the operations defined by the Kafka Streams DSL to process a stream of data, from the input to the output, create a topology.
In planning the project we identified that the Kafka Stream API seemed to be a great tool to deliver our functionality, but before we started we needed to identify any potential challenges. One of the requirements was to create the rules for combining the Price Streams easily and dynamically on demand. This forces us to create dynamic topologies in Kafka depending on the specification of the rules. While we were familiar with creating topologies statically by setting this in the project configuration, dynamic topology creation was a new concept for us. We created a technical spike to investigate this functionality and understand what might be required to achieve the desired result.
Our investigation showed that it is straightforward to create an on-demand topology with a Spring Boot and Kafka Stream API application. At the beginning of the implementation we created a simple streams configuration, giving us the possibility to prototype using the Kafka Stream API and show that the goal of the project is achievable in a static topology configuration. Later we added the possibility to create the dynamic topology using a custom REST API.
We split our Kafka Stream pipeline into 3 steps:
We’re assuming that all information about prices is available in one Kafka instance on different topics per market. In this step, we need to use the right topic, make the stream for it and filter the messages. In the case of performing the operation on two instruments, it is necessary to merge both streams. At the end we’ve got the stream with prices for both instruments.
When we produced the stream with the instrument’s prices we could perform the calculations on them to produce the expected value. Our next step in the pipeline is grouping. As we also stored the data in the materialized table, we needed to group both instruments’ values under one key to perform the aggregation. The goal is to accumulate the latest prices of both instruments in one object, which gives us the possibility to calculate the outcome value based on the prices. When the price of one instrument is changed, we can calculate the latest value based on the new price and the latest value for the second instrument, because we always keep the instruments’ prices in the accumulator. That approach allows us to calculate the outcome value based on the latest prices for the instruments without losing the precision.
We grouped by operation type which was either “add” or “multiply”. Since all elements in a particular stream have the same operation type there is effectively only one group created:
Next, we aggregate each element in the group – that is, one of two instruments - by calling “aggregate.updateValue” that simply puts those two instruments with their prices into a pair object:
Finally, we call “calculate” method on each pair object that that either add or multiplies instrument prices together:
The last stage in our pipeline is of course producing the result. In our case, we decided to push the result to Kafka topic, from where it could be consumed by the analytic system:
Using a REST endpoint we were able to dynamically create a topology defined by a set of rules in JSON. For example, we multiplied Apple stock price by EUR/USD currency exchange rate to get Apple price in Euro:
Then once we received APPL:100 and EURUSD:1.1 events on the input topics, the result of our calculations was observed on the output topic:
The app also allows us to nest another rule inside of any element, allowing for creation of indefinitely complex rules. As an example, the result of previous calculations was added to DAX index price:
In summary, in a short period of time we have built a working solution for a complex real time price calculation. Our project performs all requested operations, leaving just a DSL to be implemented separately. This means that our POC proves the tech stack to be chosen accordingly to the problem.
In our POC the dynamic topology is created in memory and not persisted anywhere, meaning it will not be recreated after restart. To take advantage of Kafka Streams stateful fault tolerance, the topology in the form of a JSON rule could be written to another Kafka topic. This would allow the app on start-up to read from the beginning of that topic to recreate the topologies. Since we use KTable state store, previous instrument prices would be read from it upon restart. This enhancement would provide the ability for the system to recover state after a service restart or failure.
Have you worked on a similar problem in your organization? We’d be very interested to hear about your successes and discuss any challenges. Get in touch with us at email@example.com