Have you heard something about Apache Beam? No? Well, I’m not surprised - this project is quite new in data processing world. Actually, I was in the same boat with you until recently, when I started to work closely with it.
In short, Apache Beam is unified programming model that provides an easy way to implement batch and streaming data processing jobs and run them on any execution engine using a set of different IOs. Sounds promising but not very clear, right? Ok, let’s try to look more closely on what actually it does mean.
Starting with this, I’m going to launch a series of posts where I’ll show some examples and highlight several use cases of data processing jobs using Apache Beam.
Our topic for today is batch processing. Let’s take the following example - you work on analytics and you want to analyze how many cars of each brand were sold for the whole period of time of observations. It means that our data set is bounded (finite amount of data) and it won’t be updated. So, in this case, we can rely on a batch process to analyze our data.
As an input data, we have text logs of sold cars in the following format:
id,brand_name,model_name,sales_number
For example:
1,Renault,Scenic,3
2,Peugeot,307,2
1,Renault,Megane,4
Before starting implementation of our first Beam application we need to get aware of some core ideas that will be used later all the time. There are three main conceptions in Beam - Pipeline, PCollection and PTransform. Pipeline encapsulates the workflow of your entire data processing tasks from start to finish. PCollection is a distributed dataset abstraction that Beam uses to transfer data between PTransforms. PTransform, in its order, is a process that operates with input data (input PCollection) and produces output data (output PCollection). Usually, the first and the last PTransforms represent a way to input/output data which can be bounded (batch processing) or unbounded (streaming processing).
To simplify things, we can consider Pipeline as DAG (directed acyclic graph) which represents your whole workflow, PTransforms as nodes (that transform the data) and PCollections as edges of this graph. More information can be found in Beam Programming Guide
Now, let’s get back to our example and try to implement the first pipeline which will process provided data set.
Then, let’s create new PTransform using pipeline.apply() method which will read data from text file and create a new PCollection of strings. To do this, we are going to use one of the already implemented IOs in Beam - TextIO.
TextIO allows to read from and write into text file(s) line by line. Also, it has many other features, like working with different file systems, supporting file patterns, streaming of files and other things. For more information, please, see Apache Beam documentation.
Output of this PTransform is a new instance of PCollection<String> where every entry of the collection is a text line of input file.
Since we want to have the total number of sales per brand as a final result, then we have to group them accordingly. Therefore, the next step will be to parse every line and create a key/value pair where key is a brand name and value is a number of sales. It’s worth to mention that the output PCollection from a previous PTransform will be the input PCollection for this one.
On this step, we use Beam internal PTransform, that is called MapElements to create a new pair of key/values for every input entry using provided implementation of SimpleFunction interface.
After, we group the number of sales by brand using another Beam’s transform - GroupByKey. As an output result we have a PCollection of key/values where key is brand name and value is iterable collection of sales for that brand.
Now we are ready to sum up all numbers of car sales per brand using own implementation of ParDo transform:
As the last step of creation of our pipeline we apply another IO transform to take a PCollection of strings and write them in a text file:
The last thing, we need to do, is to run our created pipeline:
Looks quite easy, doesn’t it? This is power of Apache Beam which allows to create complicated data processing pipelines with minimum amount of code.
Probably, some of you, who are familiar with Hadoop, noticed that this pipeline resembles something:
Yes, that’s true - this simple pipeline can be performed by a classical MapReduce job! But just compare how simpler and clearer it looks in Beam (even it’s still in Java!) and if we decide to extend our pipelines by adding another transforms then it won’t become much more complicated.
Direct Runner is a local runner which is usually used to test your pipeline. When using Java, you must specify your dependency on the Direct Runner in your pom.xml.
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.4.0</version>
<scope>runtime</scope>
</dependency>
After, you have to compile your project:
# mvn clean package
And run your pipeline on direct runner:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”
For example, if our input file contains the following data:
# cat /tmp/beam/cars_sales_log
1,Renault,Scenic,3
2,Peugeot,307,2
1,Renault,Megane,4
3,Citroen,c3,5
3,Citroen,c5,3
Then the final result would be like this:
# cat /tmp/beam/cars_sales_report
Renault: 7
Peugeot: 2
Citroen: 8
The list of all supported runners and the instructions, how to use them, can be found on this page.
Finally, all code of this example is published on this github repository: https://github.com/aromanenko-dev/beam-tutorial
In the next part I’m going to talk briefly about streaming data processing in Beam. I’ll take another example of data analytic task with unbounded data source and we will see what Beam provides us in this case.
In short, Apache Beam is unified programming model that provides an easy way to implement batch and streaming data processing jobs and run them on any execution engine using a set of different IOs. Sounds promising but not very clear, right? Ok, let’s try to look more closely on what actually it does mean.
Starting with this, I’m going to launch a series of posts where I’ll show some examples and highlight several use cases of data processing jobs using Apache Beam.
Our topic for today is batch processing. Let’s take the following example - you work on analytics and you want to analyze how many cars of each brand were sold for the whole period of time of observations. It means that our data set is bounded (finite amount of data) and it won’t be updated. So, in this case, we can rely on a batch process to analyze our data.
As an input data, we have text logs of sold cars in the following format:
id,brand_name,model_name,sales_number
For example:
1,Renault,Scenic,3
2,Peugeot,307,2
1,Renault,Megane,4
Before starting implementation of our first Beam application we need to get aware of some core ideas that will be used later all the time. There are three main conceptions in Beam - Pipeline, PCollection and PTransform. Pipeline encapsulates the workflow of your entire data processing tasks from start to finish. PCollection is a distributed dataset abstraction that Beam uses to transfer data between PTransforms. PTransform, in its order, is a process that operates with input data (input PCollection) and produces output data (output PCollection). Usually, the first and the last PTransforms represent a way to input/output data which can be bounded (batch processing) or unbounded (streaming processing).
To simplify things, we can consider Pipeline as DAG (directed acyclic graph) which represents your whole workflow, PTransforms as nodes (that transform the data) and PCollections as edges of this graph. More information can be found in Beam Programming Guide
Now, let’s get back to our example and try to implement the first pipeline which will process provided data set.
Creating a pipeline
Firstly, just create a new pipeline:Pipeline pipeline = Pipeline.create();
Then, let’s create new PTransform using pipeline.apply() method which will read data from text file and create a new PCollection of strings. To do this, we are going to use one of the already implemented IOs in Beam - TextIO.
TextIO allows to read from and write into text file(s) line by line. Also, it has many other features, like working with different file systems, supporting file patterns, streaming of files and other things. For more information, please, see Apache Beam documentation.
apply(TextIO.read().from(“/path/to/input/file”))
Output of this PTransform is a new instance of PCollection<String> where every entry of the collection is a text line of input file.
Since we want to have the total number of sales per brand as a final result, then we have to group them accordingly. Therefore, the next step will be to parse every line and create a key/value pair where key is a brand name and value is a number of sales. It’s worth to mention that the output PCollection from a previous PTransform will be the input PCollection for this one.
.apply("ParseAndConvertToKV", MapElements.via( new SimpleFunction<String, KV<String, Integer>>() { @Override public KV<String, Integer> apply(String input) { String[] split = input.split(","); if (split.length < 4) { return null; } String key = split[1]; Integer value = Integer.valueOf(split[3]); return KV.of(key, value); } } ))
On this step, we use Beam internal PTransform, that is called MapElements to create a new pair of key/values for every input entry using provided implementation of SimpleFunction interface.
After, we group the number of sales by brand using another Beam’s transform - GroupByKey. As an output result we have a PCollection of key/values where key is brand name and value is iterable collection of sales for that brand.
.apply(GroupByKey.<String, Integer>create())
Now we are ready to sum up all numbers of car sales per brand using own implementation of ParDo transform:
.apply("SumUpValuesByKey", ParDo.of(new DoFn<KV<String, Iterable<Integer>>, String>() { @ProcessElement public void processElement(ProcessContext context) { Integer totalSales = 0; String brand = context.element().getKey(); Iterable<Integer> sales = context.element().getValue(); for (Integer amount : sales) { totalSales += amount; } context.output(brand + ": " + totalSales); } }))
As the last step of creation of our pipeline we apply another IO transform to take a PCollection of strings and write them in a text file:
.apply(TextIO.write().to(“/path/to/output/dir”).withoutSharding());
The last thing, we need to do, is to run our created pipeline:
pipeline.run();
Looks quite easy, doesn’t it? This is power of Apache Beam which allows to create complicated data processing pipelines with minimum amount of code.
Probably, some of you, who are familiar with Hadoop, noticed that this pipeline resembles something:
- It reads and parses text data line by line creating new key/value pairs (Map)
- Then groups these key/values by key (GroupBy)
- Finally, iterates over all values of one key applying some user function (Reduce)
Yes, that’s true - this simple pipeline can be performed by a classical MapReduce job! But just compare how simpler and clearer it looks in Beam (even it’s still in Java!) and if we decide to extend our pipelines by adding another transforms then it won’t become much more complicated.
Building and running a pipeline
As I mentioned before, a Beam pipeline can be run on different runners (processing engines):- Direct Runner
- Apache Apex
- Apache Flink
- Apache Gearpump
- Apache Spark
- Google Cloud Dataflow
Direct Runner is a local runner which is usually used to test your pipeline. When using Java, you must specify your dependency on the Direct Runner in your pom.xml.
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.4.0</version>
<scope>runtime</scope>
</dependency>
After, you have to compile your project:
# mvn clean package
And run your pipeline on direct runner:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”
For example, if our input file contains the following data:
# cat /tmp/beam/cars_sales_log
1,Renault,Scenic,3
2,Peugeot,307,2
1,Renault,Megane,4
3,Citroen,c3,5
3,Citroen,c5,3
Then the final result would be like this:
# cat /tmp/beam/cars_sales_report
Renault: 7
Peugeot: 2
Citroen: 8
The list of all supported runners and the instructions, how to use them, can be found on this page.
Finally, all code of this example is published on this github repository: https://github.com/aromanenko-dev/beam-tutorial
In the next part I’m going to talk briefly about streaming data processing in Beam. I’ll take another example of data analytic task with unbounded data source and we will see what Beam provides us in this case.
Comments
Post a Comment