Real-Time Stream Processing With Apache Kafka Part 4: Use Case
A final use case in our four part series.
Join the DZone community and get the full member experience.
Join For FreeIn previous articles, we have gained the ground on understanding basic terminologies used in Kafka and Kafka-Streams. In this article, we set up a single node kafka cluster on our Windows machine. Now, based on the knowledge we have gained so far, let us try to build a use case.
Scenario
Consider a hypothetical fleet management company that needs a dashboard to get the insight of its day to day activities related to vehicles. Each vehicle in this fleet management company is fitted with a GPS based geolocation emitter, which emits location data containing the following information
Vehicle Id: A unique id is given to each vehicle on registration with the company.
Latitude and Longitude: geolocation information of vehicle.
Availability: The value of this field signifies whether the vehicle is available to take a booking or not.
Current Status (Online/Offline) denotes whether the vehicle is on duty or not.
Functional Requirements
The users should be able to know (in our case via REST API) the total number of vehicles online and offline at any point of time.
TBD
Solution
There are multiple ways to meet these requirements. We will try to keep this simple and follow the following steps to fulfill this need:
All GPS signals will be sent to a topic.
Our stream processor will read the records from this topic and perform the required grouping, aggregation, and materialization to Kafka state-stores.
The REST interface will be exposed. This will read and serve the data from Kafka state-stores, which were created in the earlier step.
Assumptions
Though we can publish the calculated value to topics and build auto-refreshing dashboards using web-sockets, we can also add windowing operations for more real-time results.
For the sake of simplicity, we will be processing all the events from the beginning. We are only targeting on-demand queries of values, thus exposing simple REST endpoints.
Architecture Diagram
Environment
Kafka (Topics and StateStores)
gpslocation: A Kafka topic that receives all messages that are emitted from vehicles.
statusCount: A state store that will maintain the “Online” and “Offline” vehicle counts. This is a key-value store, where the key is the status of the vehicle; value will be a count of the vehicle.
totalCount: A state store that will maintain the count of vehicles. This is done by counting the unique vehicle Id’s from all messages.
Use this command to create a topic.
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
Applications (Spring Boot)
Spring Boot offers rich libraries to interact with Kafka. We will try to implement the following components as Spring Boot based applications.
common-libs: this will contain the POJO classes to model the different objects like Vehicle, VehicleCount, VehicleLocation. This application will be added as dependency to other two applications eliminating the concerns of model mismatch.
vehicle-simulator: the responsibility of this application will be to simulate the behavior of GPS signals being emitted by the device fitted in each vehicle.
tracker-dashboard: this application will be our stream processing component. We can have a separate application for querying the state-stores of Kafka, but for the sake of simplicity, we add this functionality in this application itself.
vehicle-tracker: the overall project wrapping all other applications as Modules.
Show Me Some Code!
GPS event
Below is the POJO class to model a GPS event sent by a vehicle.
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class VehicleLocation implements Serializable{
private int vehicleId;//unique id for each vehicle
private boolean online;//weather vehilce is online or offline
private boolean available;// is vehilcle ready to take bookings or not
private double latitude;
private double longitude;
public VehicleLocation(boolean online, boolean available, double latitude, double longitude) {
super();
this.online = online;
this.available = available;
this.latitude = latitude;
this.longitude = longitude;
}
}
Stream Processor
Below is our stream processor to process the records from the topic, “gpslocation,” and store them in a state store, “statusCount.” The key in this state-store will be:
Online: key for online vehicle count.
Offline: key for offline vehicle count.
@Component
public class VehicleStatusCountProcessor {
@Autowired
private KafkaProperties kafkaProperties;//default properties
@Bean//configure key-value serdes
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
return props;
}
@Bean
public KStream<String, Long> statusCountStreamProcessor(StreamsBuilder streamsBuilder) {
KStream<Integer, VehicleLocation> stream = streamsBuilder.stream("gpslocation",//Read from topic
Consumed.with(Serdes.Integer(), new JsonSerde<>(VehicleLocation.class)));//using Integer and JSON serde
return stream.map((k,v)-> {// transform they key as Online/Offline based on status
String online = v.isOnline() == true ? "Online" : "Offline";
return new KeyValue<>(online, v);
})
.groupByKey(Serialized.with(//Group by the newly mapped key in previous step
Serdes.String(),
new JsonSerde<>(VehicleLocation.class))
)
.count(Materialized.as("statusCount"))// materialize this value to state store
.toStream();
}
}
Query the Store
Below is the code for the REST interface to get the state store and query for a specific key (Online/Offline).
@RestController
public class VehicleQueryService {
@Autowired
private StreamsBuilderFactoryBean kStreamBuilderFactoryBean;
@GetMapping("/count/{status}")
public long getVehicleCoutnForStatus(@PathVariable("status") String status) {
// Get the state-store
ReadOnlyKeyValueStore<String, Long> keyValueStore= kStreamBuilderFactoryBean.getKafkaStreams()
.store("statusCount", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(status);//get the count for the key viz. Offline/Online
}
}
Response for Offline & Online vehicle count
Query the localhost URL exposed by the tracker-dashboard application for total events received as “Online”
You should be able to get Online/Offline vehicle count changing as the records are published.
In this article, we have solved a simple use case, though stream processing can be used to build full-fledged, real-time applications.
I hope this article left you with some insights about Kafka and its Stream processing capabilities
The full code available at this git location. Please feel free to share your valuable feedback, questions.
Opinions expressed by DZone contributors are their own.
Comments