Skip to main content

Developing data processing job using Apache Beam - Windowing

Talking about Streaming data processing, it's definitively worth to mention "Windowing".  An example, showed in the previous post about streaming, was quite simple and didn't incorporate any aggregation operations like GroupByKey or Combine. But what if we need to perform such data transforms on unbounded stream? The answer is not as obvious as for bounded data collections because, in this case. we don't have an exact moment on a time line when we can conclude that all data has been arrived. So, for this purpose, most of the data processing engines use a conception of Windowing that allows to split unbounded data stream into time "windows" and perform such operation inside it. Beam is not an exception and it provides rich API for doing that.

Problem description

Let's take another example to see how windowing works in practice. Just to recall, the previous task was the following – there is unlimited stream of geo coordinates (X and Y) of some objects (for example, cars) on map which arrives in real time from Kafka topic and we want to select only those that are located inside specified rectangle area. After that, we just write back to another Kafka topic filtered pair of coordinates.

For now, we have more complicated requirement - we need to count the number of times when every object appeared inside that specified area and perform this every 10 seconds.

To achieve this goal, we are going to virtually split our unbounded data stream by 10 seconds fixed windows (usually, windows are based on event time of the records) and then apply GroupByKey and Combine transforms over them. In other words, it means that every element of our PCollection will be devoted to certain window based on its event time.

More details about how Apache Beam works with Windowing could be found at this official programming guide.

Creating a pipeline

So, the same way as in the previous parts, let's start coding our data processing pipeline and I'll provide more details along with this. The first part of this pipeline will be almost the same as it was developed for example in previous post - we read data from Kafka using KafkaIO, retrieve a payload from KafkaRecord and filter out the objects that are not located in specified area. The only difference is that after reading the entries from Kafka in the first step, we need to assign every record to Window based on its event time information. To do this we use Beam transform called Window and windowing function FixedWindows which split our input stream of data into 10 seconds fixed-size windows. Beam provides several other windowing functions, like Sliding Windows, Session Windows, etc, that could be used depending on your requirements.


pipeline
    .apply(
        KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getBootstrap())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class))
    .apply(
        Window.<KafkaRecord<Long, String>>into(
            FixedWindows.of(Duration.standardSeconds(10))))

Once input records were grouped into windows, we can apply such transforms as GroupByKey, Combine, etc over the elements of every window.

So, after input record was assigned to a window and filtered, we extract object id from payload and then group all records in window by this id:

.apply(
    ParDo.of(
        new DoFn<String, KV<Long, String>>() {
          @ProcessElement
          public void processElement(ProcessContext processContext) {
            String payload = processContext.element();

            String[] split = payload.split(",");
            if (split.length < 3) {
              return;
            }
            Long id = Long.valueOf(split[0]);
            processContext.output(KV.of(id, payload));
          }
        }))
.apply(GroupByKey.<Long, String>create())

A tiny extract from Beam GroupByKey documentation:
GroupByKey is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey to collect all of the values associated with each unique key.

The next step will be to write "Reduce" phase  - count the number of values associated with current id. It won't be difficult because GroupByKey transform returns PCollection of key-values where key is object is and value is Iterable container of object coordinates.

.apply(
    ParDo.of(
        new DoFn<KV<Long, Iterable<String>>, KV<Long, Long>>() {
          @ProcessElement
          public void processElement(ProcessContext processContext) {
            Long id = processContext.element().getKey();
            long count = 0;
            for (String entry : processContext.element().getValue()) {
              count++;
            }
            processContext.output(KV.of(id, count));
          }
        }))

After all main business logic has been implemented, we need to prepare data to write back to Kafka topic and do actually a write using KafkaIO.write(). This part again is the same as it was showed in previous post so we can skip it here.

Building and running a pipeline

No additional dependencies are needed compare to first blog post about streaming. So, let's just build a jar and run it with DirectRunner to test how it works:

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterAndCountObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

As usually, all code of this example is published on this github repository.

Comments

Popular posts from this blog

Apache Beam for developers: Creating new IO connectors, part 1

By this post, I'll start a series of blog posts about creating new IO connectors in Apache Beam . Introduction to Beam IO Before getting into Beam IO internals, let's take a quick look on what actually Beam pipeline codebase is. In general, logically all code, that required to run a user pipeline, can be split into 4 layers - Runner , SDK , Transforms & IO , User Code .  On the bottom level, there is a Runner  code, which is responsible for all translations of user pipeline to make it possible to run on preferred data processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc.   On the second level, we have a SDK  code. This part of code allows to write a Beam pipeline in favourite user programming language. For the moment, Beam supports the following SDKs: Java, Python and Go. Scala is supported through 3rd party SDK called Scio .  Third level incorporates different Beam Transforms , like ParDo , GroupByKey , Combine , e...

Developing data processing job using Apache Beam - Avro schema and schema-aware PCollections

Avro in Beam Before, we always talked about PCollections that were not aware about the structure of their records. For example, in this post we have had to parse text files manually, extract the fields and perform GroupByKey operation based on this. This is inconvenient and error-prone way of working with structured data. It can be done sufficiently easier and in more clear and elegant way if we had schema-based records as an input data collection. For the moment, there are many different data structure formats “on the market” - JSON, Avro, Parquet, Protocol Buffers, etc. - that could be used for structuring our data in more fancy way than just plain text. Apache Beam provides a good support of Avro format , so we will see in this post how to work efficiently with it. First of all, there is an AvroIO , which allows to work with files, written in Avro format, in Beam pipelines. Under the hood, it uses FileIO , so it means that we can read/write files from/to different File Systems...

Developing data processing job using Apache Beam - Batch pipeline

Have you heard something about Apache Beam ? No? Well, I’m not surprised - this project is quite new in data processing world. Actually, I was in the same boat with you until recently, when I started to work closely with it. In short, Apache Beam is unified programming model that provides an easy way to implement batch and streaming data processing jobs and run them on any execution engine using a set of different IOs. Sounds promising but not very clear, right? Ok, let’s try to look more closely on what actually it does mean. Starting with this, I’m going to launch a series of posts where I’ll show some examples and highlight several use cases of data processing jobs using Apache Beam. Our topic for today is batch processing. Let’s take the following example - you work on analytics and you want to analyze how many cars of each brand were sold for the whole period of time of observations. It means that our data set is bounded (finite amount of data) and it won’t be updated. So, i...