Talking about Streaming data processing, it's definitively worth to mention "Windowing". An example, showed in the previous post about streaming, was quite simple and didn't incorporate any aggregation operations like GroupByKey or Combine. But what if we need to perform such data transforms on unbounded stream? The answer is not as obvious as for bounded data collections because, in this case. we don't have an exact moment on a time line when we can conclude that all data has been arrived. So, for this purpose, most of the data processing engines use a conception of Windowing that allows to split unbounded data stream into time "windows" and perform such operation inside it. Beam is not an exception and it provides rich API for doing that.
For now, we have more complicated requirement - we need to count the number of times when every object appeared inside that specified area and perform this every 10 seconds.
To achieve this goal, we are going to virtually split our unbounded data stream by 10 seconds fixed windows (usually, windows are based on event time of the records) and then apply GroupByKey and Combine transforms over them. In other words, it means that every element of our PCollection will be devoted to certain window based on its event time.
More details about how Apache Beam works with Windowing could be found at this official programming guide.
Once input records were grouped into windows, we can apply such transforms as GroupByKey, Combine, etc over the elements of every window.
So, after input record was assigned to a window and filtered, we extract object id from payload and then group all records in window by this id:
A tiny extract from Beam GroupByKey documentation:
GroupByKey is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey to collect all of the values associated with each unique key.
The next step will be to write "Reduce" phase - count the number of values associated with current id. It won't be difficult because GroupByKey transform returns PCollection of key-values where key is object is and value is Iterable container of object coordinates.
After all main business logic has been implemented, we need to prepare data to write back to Kafka topic and do actually a write using KafkaIO.write(). This part again is the same as it was showed in previous post so we can skip it here.
# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterAndCountObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"
As usually, all code of this example is published on this github repository.
Problem description
Let's take another example to see how windowing works in practice. Just to recall, the previous task was the following – there is unlimited stream of geo coordinates (X and Y) of some objects (for example, cars) on map which arrives in real time from Kafka topic and we want to select only those that are located inside specified rectangle area. After that, we just write back to another Kafka topic filtered pair of coordinates.For now, we have more complicated requirement - we need to count the number of times when every object appeared inside that specified area and perform this every 10 seconds.
To achieve this goal, we are going to virtually split our unbounded data stream by 10 seconds fixed windows (usually, windows are based on event time of the records) and then apply GroupByKey and Combine transforms over them. In other words, it means that every element of our PCollection will be devoted to certain window based on its event time.
More details about how Apache Beam works with Windowing could be found at this official programming guide.
Creating a pipeline
So, the same way as in the previous parts, let's start coding our data processing pipeline and I'll provide more details along with this. The first part of this pipeline will be almost the same as it was developed for example in previous post - we read data from Kafka using KafkaIO, retrieve a payload from KafkaRecord and filter out the objects that are not located in specified area. The only difference is that after reading the entries from Kafka in the first step, we need to assign every record to Window based on its event time information. To do this we use Beam transform called Window and windowing function FixedWindows which split our input stream of data into 10 seconds fixed-size windows. Beam provides several other windowing functions, like Sliding Windows, Session Windows, etc, that could be used depending on your requirements.pipeline .apply( KafkaIO.<Long, String>read() .withBootstrapServers(options.getBootstrap()) .withTopic(options.getInputTopic()) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class)) .apply( Window.<KafkaRecord<Long, String>>into( FixedWindows.of(Duration.standardSeconds(10))))
Once input records were grouped into windows, we can apply such transforms as GroupByKey, Combine, etc over the elements of every window.
So, after input record was assigned to a window and filtered, we extract object id from payload and then group all records in window by this id:
.apply( ParDo.of( new DoFn<String, KV<Long, String>>() { @ProcessElement public void processElement(ProcessContext processContext) { String payload = processContext.element(); String[] split = payload.split(","); if (split.length < 3) { return; } Long id = Long.valueOf(split[0]);
processContext.output(KV.of(id, payload)); } })) .apply(GroupByKey.<Long, String>create())
A tiny extract from Beam GroupByKey documentation:
GroupByKey is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey to collect all of the values associated with each unique key.
The next step will be to write "Reduce" phase - count the number of values associated with current id. It won't be difficult because GroupByKey transform returns PCollection of key-values where key is object is and value is Iterable container of object coordinates.
.apply( ParDo.of( new DoFn<KV<Long, Iterable<String>>, KV<Long, Long>>() { @ProcessElement public void processElement(ProcessContext processContext) { Long id = processContext.element().getKey(); long count = 0; for (String entry : processContext.element().getValue()) { count++; } processContext.output(KV.of(id, count)); } }))
After all main business logic has been implemented, we need to prepare data to write back to Kafka topic and do actually a write using KafkaIO.write(). This part again is the same as it was showed in previous post so we can skip it here.
Building and running a pipeline
No additional dependencies are needed compare to first blog post about streaming. So, let's just build a jar and run it with DirectRunner to test how it works:# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterAndCountObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"
As usually, all code of this example is published on this github repository.
Comments
Post a Comment