Skip to main content

Developing data processing job using Apache Beam - Batch pipeline

Have you heard something about Apache Beam? No? Well, I’m not surprised - this project is quite new in data processing world. Actually, I was in the same boat with you until recently, when I started to work closely with it.

In short, Apache Beam is unified programming model that provides an easy way to implement batch and streaming data processing jobs and run them on any execution engine using a set of different IOs. Sounds promising but not very clear, right? Ok, let’s try to look more closely on what actually it does mean.

Starting with this, I’m going to launch a series of posts where I’ll show some examples and highlight several use cases of data processing jobs using Apache Beam.

Our topic for today is batch processing. Let’s take the following example - you work on analytics and you want to analyze how many cars of each brand were sold for the whole period of time of observations. It means that our data set is bounded (finite amount of data) and it won’t be updated. So, in this case, we can rely on a batch process to analyze our data.

As an input data, we have text logs of sold cars in the following format:
id,brand_name,model_name,sales_number

For example:
1,Renault,Scenic,3
2,Peugeot,307,2
1,Renault,Megane,4

Before starting implementation of our first Beam application we need to get aware of some core ideas that will be used later all the time. There are three main conceptions in Beam - Pipeline, PCollection and PTransform. Pipeline encapsulates the workflow of your entire data processing tasks from start to finish. PCollection is a distributed dataset abstraction that Beam uses to transfer data between PTransforms. PTransform, in its order, is a process that operates with input data (input PCollection) and produces output data (output PCollection). Usually, the first and the last PTransforms represent a way to input/output data which can be bounded (batch processing) or unbounded (streaming processing).

To simplify things, we can consider Pipeline as DAG (directed acyclic graph) which represents your whole workflow, PTransforms as nodes (that transform the data) and PCollections as edges of this graph. More information can be found in Beam Programming Guide

Now, let’s get back to our example and try to implement the first pipeline which will process provided data set.

Creating a pipeline

Firstly, just create a new pipeline:

Pipeline pipeline = Pipeline.create();

Then, let’s create new PTransform using pipeline.apply() method which will read data from text file and create a new PCollection of strings. To do this, we are going to use one of the already implemented IOs in Beam - TextIO.

TextIO allows to read from and write into text file(s) line by line. Also, it has many other features, like working with different file systems, supporting file patterns, streaming of files and other things. For more information, please, see Apache Beam documentation.

apply(TextIO.read().from(/path/to/input/file))

Output of this PTransform is a new instance of PCollection<String> where every entry of the collection is a text line of input file.

Since we want to have the total number of sales per brand as a final result, then we have to group them accordingly. Therefore, the next step will be to parse every line and create a key/value pair where key is a brand name and value is a number of sales. It’s worth to mention that the output PCollection from a previous PTransform will be the input PCollection for this one.

.apply("ParseAndConvertToKV", MapElements.via(
    new SimpleFunction<String, KV<String, Integer>>() {
        @Override
        public KV<String, Integer> apply(String input) {
            String[] split = input.split(",");
            if (split.length < 4) {
                return null;
            }
            String key = split[1];
            Integer value = Integer.valueOf(split[3]);
            return KV.of(key, value);
        }
    }
))

On this step, we use Beam internal PTransform, that is called MapElements to create a new pair of key/values for every input entry using provided implementation of SimpleFunction interface.

After, we group the number of sales by brand using another Beam’s transform - GroupByKey. As an output result we have a PCollection of key/values where key is brand name and value is iterable collection of sales for that brand.

.apply(GroupByKey.<String, Integer>create())

Now we are ready to sum up all numbers of car sales per brand using own implementation of ParDo transform:

.apply("SumUpValuesByKey", ParDo.of(new DoFn<KV<String, Iterable<Integer>>, String>() {

    @ProcessElement
    public void processElement(ProcessContext context) {
        Integer totalSales = 0;
        String brand = context.element().getKey();
        Iterable<Integer> sales = context.element().getValue();
        for (Integer amount : sales) {
            totalSales += amount;
        }
        context.output(brand + ": " + totalSales);
    }
}))

As the last step of creation of our pipeline we apply another IO transform to take a PCollection of strings and write them in a text file:


.apply(TextIO.write().to(/path/to/output/dir).withoutSharding());

The last thing, we need to do, is to run our created pipeline:


pipeline.run();

Looks quite easy, doesn’t it? This is power of Apache Beam which allows to create complicated data processing pipelines with minimum amount of code.

Probably, some of you, who are familiar with Hadoop, noticed that this pipeline resembles something:
  • It reads and parses text data line by line creating new key/value pairs (Map)
  • Then groups these key/values by key (GroupBy)
  • Finally, iterates over all values of one key applying some user function (Reduce)

Yes, that’s true - this simple pipeline can be performed by a classical MapReduce job! But just compare how simpler and clearer it looks in Beam (even it’s still in Java!) and if we decide to extend our pipelines by adding another transforms then it won’t become much more complicated.

Building and running a pipeline

As I mentioned before, a Beam pipeline can be run on different runners (processing engines):
  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow
To do this, we just need to add a correspondent dependency to our maven or gradle project configuration. The good thing is that we don’t have to adjust or rewrite pipeline code to run it on each runner. Even more, we don’t have to recompile our jars if all required runners dependency were included before - we just need to choose which runner to use and that’s it!

Direct Runner is a local runner which is usually used to test your pipeline. When using Java, you must specify your dependency on the Direct Runner in your pom.xml.

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.4.0</version>
   <scope>runtime</scope>
</dependency>

After, you have to compile your project:
# mvn clean package

And run your pipeline on direct runner:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”

For example, if our input file contains the following data:
# cat /tmp/beam/cars_sales_log
 1,Renault,Scenic,3
 2,Peugeot,307,2
 1,Renault,Megane,4
 3,Citroen,c3,5
 3,Citroen,c5,3

Then the final result would be like this:
# cat /tmp/beam/cars_sales_report
Renault: 7
Peugeot: 2
Citroen: 8

The list of all supported runners and the instructions, how to use them, can be found on this page.

Finally, all code of this example is published on this github repository: https://github.com/aromanenko-dev/beam-tutorial

In the next part I’m going to talk briefly about streaming data processing in Beam. I’ll take another example of data analytic task with unbounded data source and we will see what Beam provides us in this case.

Comments

Popular posts from this blog

Apache Beam for developers: Creating new IO connectors, part 1

By this post, I'll start a series of blog posts about creating new IO connectors in Apache Beam . Introduction to Beam IO Before getting into Beam IO internals, let's take a quick look on what actually Beam pipeline codebase is. In general, logically all code, that required to run a user pipeline, can be split into 4 layers - Runner , SDK , Transforms & IO , User Code .  On the bottom level, there is a Runner  code, which is responsible for all translations of user pipeline to make it possible to run on preferred data processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc.   On the second level, we have a SDK  code. This part of code allows to write a Beam pipeline in favourite user programming language. For the moment, Beam supports the following SDKs: Java, Python and Go. Scala is supported through 3rd party SDK called Scio .  Third level incorporates different Beam Transforms , like ParDo , GroupByKey , Combine , etc. Also, it includes

Cross-language pipelines in Beam

Java is famous of its paradigm – Write once, Run anywhere – which was defined in 1995 by Sun Microsystems to illustrate the cross-platform benefits of the Java language. Apache Beam follows the similar principle but in regard to cross-platform data processing engines – Write pipeline once and Run it on every data processing engine . Beam achieves that by leveraging a conception of Beam Runner – the programming framework, which is responsible to translate a pipeline, written in Beam model way, into a code that can be run on required processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc. All translations actually happen in runtime – users don’t even need to recompile their code to change a runner if all dependencies were already provided in compile time. Therefore, Beam already supports a bunch of different runners  that can be easily used to run a user’s pipeline on different platforms.  However, the classical runner translates user code only from and to the same SD

Developing data processing job using Apache Beam - Avro schema and schema-aware PCollections

Avro in Beam Before, we always talked about PCollections that were not aware about the structure of their records. For example, in this post we have had to parse text files manually, extract the fields and perform GroupByKey operation based on this. This is inconvenient and error-prone way of working with structured data. It can be done sufficiently easier and in more clear and elegant way if we had schema-based records as an input data collection. For the moment, there are many different data structure formats “on the market” - JSON, Avro, Parquet, Protocol Buffers, etc. - that could be used for structuring our data in more fancy way than just plain text. Apache Beam provides a good support of Avro format , so we will see in this post how to work efficiently with it. First of all, there is an AvroIO , which allows to work with files, written in Avro format, in Beam pipelines. Under the hood, it uses FileIO , so it means that we can read/write files from/to different File Systems