Skip to main content

Developing data processing job using Apache Beam - Windowing

Talking about Streaming data processing, it's definitively worth to mention "Windowing".  An example, showed in the previous post about streaming, was quite simple and didn't incorporate any aggregation operations like GroupByKey or Combine. But what if we need to perform such data transforms on unbounded stream? The answer is not as obvious as for bounded data collections because, in this case. we don't have an exact moment on a time line when we can conclude that all data has been arrived. So, for this purpose, most of the data processing engines use a conception of Windowing that allows to split unbounded data stream into time "windows" and perform such operation inside it. Beam is not an exception and it provides rich API for doing that.

Problem description

Let's take another example to see how windowing works in practice. Just to recall, the previous task was the following – there is unlimited stream of geo coordinates (X and Y) of some objects (for example, cars) on map which arrives in real time from Kafka topic and we want to select only those that are located inside specified rectangle area. After that, we just write back to another Kafka topic filtered pair of coordinates.

For now, we have more complicated requirement - we need to count the number of times when every object appeared inside that specified area and perform this every 10 seconds.

To achieve this goal, we are going to virtually split our unbounded data stream by 10 seconds fixed windows (usually, windows are based on event time of the records) and then apply GroupByKey and Combine transforms over them. In other words, it means that every element of our PCollection will be devoted to certain window based on its event time.

More details about how Apache Beam works with Windowing could be found at this official programming guide.

Creating a pipeline

So, the same way as in the previous parts, let's start coding our data processing pipeline and I'll provide more details along with this. The first part of this pipeline will be almost the same as it was developed for example in previous post - we read data from Kafka using KafkaIO, retrieve a payload from KafkaRecord and filter out the objects that are not located in specified area. The only difference is that after reading the entries from Kafka in the first step, we need to assign every record to Window based on its event time information. To do this we use Beam transform called Window and windowing function FixedWindows which split our input stream of data into 10 seconds fixed-size windows. Beam provides several other windowing functions, like Sliding Windows, Session Windows, etc, that could be used depending on your requirements.


pipeline
    .apply(
        KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getBootstrap())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class))
    .apply(
        Window.<KafkaRecord<Long, String>>into(
            FixedWindows.of(Duration.standardSeconds(10))))

Once input records were grouped into windows, we can apply such transforms as GroupByKey, Combine, etc over the elements of every window.

So, after input record was assigned to a window and filtered, we extract object id from payload and then group all records in window by this id:

.apply(
    ParDo.of(
        new DoFn<String, KV<Long, String>>() {
          @ProcessElement
          public void processElement(ProcessContext processContext) {
            String payload = processContext.element();

            String[] split = payload.split(",");
            if (split.length < 3) {
              return;
            }
            Long id = Long.valueOf(split[0]);
            processContext.output(KV.of(id, payload));
          }
        }))
.apply(GroupByKey.<Long, String>create())

A tiny extract from Beam GroupByKey documentation:
GroupByKey is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey to collect all of the values associated with each unique key.

The next step will be to write "Reduce" phase  - count the number of values associated with current id. It won't be difficult because GroupByKey transform returns PCollection of key-values where key is object is and value is Iterable container of object coordinates.

.apply(
    ParDo.of(
        new DoFn<KV<Long, Iterable<String>>, KV<Long, Long>>() {
          @ProcessElement
          public void processElement(ProcessContext processContext) {
            Long id = processContext.element().getKey();
            long count = 0;
            for (String entry : processContext.element().getValue()) {
              count++;
            }
            processContext.output(KV.of(id, count));
          }
        }))

After all main business logic has been implemented, we need to prepare data to write back to Kafka topic and do actually a write using KafkaIO.write(). This part again is the same as it was showed in previous post so we can skip it here.

Building and running a pipeline

No additional dependencies are needed compare to first blog post about streaming. So, let's just build a jar and run it with DirectRunner to test how it works:

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterAndCountObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

As usually, all code of this example is published on this github repository.

Comments

Popular posts from this blog

Apache Beam for developers: Creating new IO connectors, part 1

By this post, I'll start a series of blog posts about creating new IO connectors in Apache Beam . Introduction to Beam IO Before getting into Beam IO internals, let's take a quick look on what actually Beam pipeline codebase is. In general, logically all code, that required to run a user pipeline, can be split into 4 layers - Runner , SDK , Transforms & IO , User Code .  On the bottom level, there is a Runner  code, which is responsible for all translations of user pipeline to make it possible to run on preferred data processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc.   On the second level, we have a SDK  code. This part of code allows to write a Beam pipeline in favourite user programming language. For the moment, Beam supports the following SDKs: Java, Python and Go. Scala is supported through 3rd party SDK called Scio .  Third level incorporates different Beam Transforms , like ParDo , GroupByKey , Combine , etc. Also, it includes

Cross-language pipelines in Beam

Java is famous of its paradigm – Write once, Run anywhere – which was defined in 1995 by Sun Microsystems to illustrate the cross-platform benefits of the Java language. Apache Beam follows the similar principle but in regard to cross-platform data processing engines – Write pipeline once and Run it on every data processing engine . Beam achieves that by leveraging a conception of Beam Runner – the programming framework, which is responsible to translate a pipeline, written in Beam model way, into a code that can be run on required processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc. All translations actually happen in runtime – users don’t even need to recompile their code to change a runner if all dependencies were already provided in compile time. Therefore, Beam already supports a bunch of different runners  that can be easily used to run a user’s pipeline on different platforms.  However, the classical runner translates user code only from and to the same SD

Developing data processing job using Apache Beam - Avro schema and schema-aware PCollections

Avro in Beam Before, we always talked about PCollections that were not aware about the structure of their records. For example, in this post we have had to parse text files manually, extract the fields and perform GroupByKey operation based on this. This is inconvenient and error-prone way of working with structured data. It can be done sufficiently easier and in more clear and elegant way if we had schema-based records as an input data collection. For the moment, there are many different data structure formats “on the market” - JSON, Avro, Parquet, Protocol Buffers, etc. - that could be used for structuring our data in more fancy way than just plain text. Apache Beam provides a good support of Avro format , so we will see in this post how to work efficiently with it. First of all, there is an AvroIO , which allows to work with files, written in Avro format, in Beam pipelines. Under the hood, it uses FileIO , so it means that we can read/write files from/to different File Systems