Skip to main content

Developing data processing job using Apache Beam - Streaming pipeline

This time we are going to talk about one of the most demanded thing in modern BigData world nowadays – processing of Streaming data.

The principal difference between Batching and Streaming is type of input data source. When your data set is limited (even if it’s huge in terms of size) and it is not being updated along the time of processing, then you would likely use Batching pipeline. Input source in this case can be, for instance, files, database tables, objects in object storages, etc. I want to underline one more time that, with batching, we assume that data is immutable during all the processing time and number of input records is constant. Why we should pay attention on this? Because even with files we can have unlimited data stream when files are always added or changed. In such case we have to apply streaming approach to work with data. So, if we know that our data is limited and immutable then we need to develop batching pipeline, like it was showed and explained in the first part of this series.

Things are getting more complicated when our data set is unlimited (continuously arrived) or/and mutable – so we can say that we have a stream of data. Some of the examples of such sources might be the following - delivery messages systems (like Apache Kafka), new files in directory (web server logs) or some other systems of real time data collecting (like IoT sensors). The common thing for all of these sources will be the observation that we always have to wait for new data. Of course, we can split our data into batches (by time or by data size) and process every split in batching way, but it would be quite difficult to apply some functions across all consumed data set and create the whole pipeline for this. Luckily, there are several streaming engines that allow to cope with such type of data processing easily – Apache Spark, Apache Flink, Apache Apex, Google DataFlow. All of them are supported by Apache Beam and we can run the same pipeline on different engines without any code changes. Moreover, we can use the same pipeline in batching or in streaming mode with minimal changes – the one just needs to properly set input source and voilà – everything works out of the box! Isn’t this just a magic? I would dream of this a while ago when I was rewriting my batch jobs into streaming ones.

So, enough theory - it’s time to take an example and write our first streaming code. We are going to read some data from Kafka (unbounded source), perform some simple data processing and write results back to Kafka as well.

Suppose, the task is following – we have unlimited stream of geo coordinates (X and Y) of some objects on map (for example, the objects are cars) which arrives in real time and we want to select only those that are located inside specified rectangle area. In other words, we have to consume text data from Kafka topic, parse it, filter by specified limits and write back into another Kafka topic. Let’s see how we can do this with a help of Apache Beam.

Every Kafka message contains text data in the following format:
id,x,y

where:
  id – unique id of object,
  x, y - coordinates on map (integers).

We will need to take care of format if it’s not valid and skip such records.

Creating a pipeline

First of all, as in previous part, where we did batching processing, we create a pipeline in the same way:

Pipeline pipeline = Pipeline.create(options);

We can elaborate Options object to pass command line options into pipeline. Please, see the whole example on github for more details.

Then, on the next step we have to read data from Kafka input topic. As it was said before, Apache Beam already provides a number of different IO connectors and KafkaIO is one of them. Therefore, we create new unbounded PTransform which consumes arriving messages from specified Kafka topic and propagates them further to next step:

pipeline.apply(
    KafkaIO.<Long, String>read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))

By default, KafkaIO encapsulates all consumed messages into KafkaRecord object. Though, next transform just retrieves a payload (string values) by new created DoFn object:

.apply(
    ParDo.of(
        new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord<Long, String> record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)

After, it is time to filter the records (see the initial task stated above) but before we have to parse our string value according to defined format. So, it can be encapsulated into one functional object which then will be used by Beam internal transform Filter.

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)

Then, we have to prepare filtered messages to write back to Kafka by creating new pair of key/values using internal Beam KV class which can be used across different IO connectors, including KafkaIO as well.

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn<String, KV<String, String>>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)

The final transform is needed to write messages into Kafka, so we simply use KafkaIO.write() - sink implementation – for these purposes. As for reading, we have to configure this transform with some required options, like Kafka bootstrap servers, output topic name and serialisers for key/value.

.apply(
    "WriteToKafka",
    KafkaIO.<String, String>write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);

In the end, we just run our pipeline as usually:

pipeline.run();

This time it seems a bit more complicated than it was in previous part, but, as one can easily notice, we didn’t do any specific things to make our pipeline streaming-compatible. This is the whole responsibility of Beam data model implementation which makes it very easy to switch between batching and streaming processing for Beam users.

Building and running a pipeline

Let’s add the required dependencies to make it possible to use Beam KafkaIO:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.1.0</version>
</dependency>

Then, just build a jar and run it with DirectRunner to test how it works:

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

If it’s needed, we can add other arguments used in pipeline with a help of “exec.args” option. Also, make sure that your Kafka servers are available and properly specified before running Beam pipeline.

Last maven command will launch a pipeline and run it forever until it will be finished manually (optionally, it is possible to specify maximum running time). So, it means that data will be processed continuously, in streaming mode.

As usually, all code of this example is published on this github repository.

Have a good streaming!


Comments

Popular posts from this blog

Apache Beam for developers: Creating new IO connectors, part 1

By this post, I'll start a series of blog posts about creating new IO connectors in Apache Beam . Introduction to Beam IO Before getting into Beam IO internals, let's take a quick look on what actually Beam pipeline codebase is. In general, logically all code, that required to run a user pipeline, can be split into 4 layers - Runner , SDK , Transforms & IO , User Code .  On the bottom level, there is a Runner  code, which is responsible for all translations of user pipeline to make it possible to run on preferred data processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc.   On the second level, we have a SDK  code. This part of code allows to write a Beam pipeline in favourite user programming language. For the moment, Beam supports the following SDKs: Java, Python and Go. Scala is supported through 3rd party SDK called Scio .  Third level incorporates different Beam Transforms , like ParDo , GroupByKey , Combine , etc. Also, it includes

Cross-language pipelines in Beam

Java is famous of its paradigm – Write once, Run anywhere – which was defined in 1995 by Sun Microsystems to illustrate the cross-platform benefits of the Java language. Apache Beam follows the similar principle but in regard to cross-platform data processing engines – Write pipeline once and Run it on every data processing engine . Beam achieves that by leveraging a conception of Beam Runner – the programming framework, which is responsible to translate a pipeline, written in Beam model way, into a code that can be run on required processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc. All translations actually happen in runtime – users don’t even need to recompile their code to change a runner if all dependencies were already provided in compile time. Therefore, Beam already supports a bunch of different runners  that can be easily used to run a user’s pipeline on different platforms.  However, the classical runner translates user code only from and to the same SD

Developing data processing job using Apache Beam - Avro schema and schema-aware PCollections

Avro in Beam Before, we always talked about PCollections that were not aware about the structure of their records. For example, in this post we have had to parse text files manually, extract the fields and perform GroupByKey operation based on this. This is inconvenient and error-prone way of working with structured data. It can be done sufficiently easier and in more clear and elegant way if we had schema-based records as an input data collection. For the moment, there are many different data structure formats “on the market” - JSON, Avro, Parquet, Protocol Buffers, etc. - that could be used for structuring our data in more fancy way than just plain text. Apache Beam provides a good support of Avro format , so we will see in this post how to work efficiently with it. First of all, there is an AvroIO , which allows to work with files, written in Avro format, in Beam pipelines. Under the hood, it uses FileIO , so it means that we can read/write files from/to different File Systems