Skip to main content

Developing data processing job using Apache Beam - Avro schema and schema-aware PCollections

Avro in Beam

Before, we always talked about PCollections that were not aware about the structure of their records. For example, in this post we have had to parse text files manually, extract the fields and perform GroupByKey operation based on this. This is inconvenient and error-prone way of working with structured data. It can be done sufficiently easier and in more clear and elegant way if we had schema-based records as an input data collection. For the moment, there are many different data structure formats “on the market” - JSON, Avro, Parquet, Protocol Buffers, etc. - that could be used for structuring our data in more fancy way than just plain text. Apache Beam provides a good support of Avro format, so we will see in this post how to work efficiently with it.

First of all, there is an AvroIO, which allows to work with files, written in Avro format, in Beam pipelines. Under the hood, it uses FileIO, so it means that we can read/write files from/to different File Systems, specify the file patterns for input paths and use other features provided by FileIO. Also, AvroIO may return as output PCollection of Avro generated classes or GenericRecord’s, that can be used downstream (we will see this in example later).

The second thing is that Beam already provides different tools to work with Avro schema, like AvroUtils and AvroCoder, that allow to convert Avro records into Beam rows (Row) back and forward and use Avro schema for Beam coders instantiation.

What is a Beam Row? In two words, this is a type of generic row with schema (this time it’s a Beam schema, not Avro!) that can be used in Beam SQL transforms. For more details see official documentation.

With this basic information about Avro support in Beam, we can already start to modify the pipeline from this blog post and leverage Avro and Beam schemas to process our data with Beam SQL transforms.

From plain text to Avro Schema records

Let’s recall what was the initial task there. As an input data, we have text logs of sold cars in the following format:
id,brand_name,model_name,sales_number

For example:
1,Renault,Scenic,3
2,Peugeot,307,2
1,Renault,Megane,4

Our task is to analyse how many cars of each brand were sold for the whole period of time of observations. So, in the first turn, let’s create an Avro schema which will reflect our log structure:
{
    "namespace": "sales.avro",
    "type": "record",
    "name": "AvroGeneratedCarsSales",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "brand_name", "type": "string"},
        {"name": "model_name", "type": "string"},
        {"name": "sales_number", "type": "int"}
    ]
}

Then, we suppose that all data that we read is consisted of the records created using this schema. So, to leverage this schema we need to create new Avro Schema object:

Schema SCHEMA = new Schema.Parser().parse(SCHEMA_STRING);

After, we pass this Schema and input path, specified by CLI option where all our data is located, into AvroIO that returns PCollection of GenericRecords in its turn.

PCollection<GenericRecord> records = pipeline.apply(
    AvroIO.readGenericRecords(SCHEMA).
        withBeamSchemas(true).from(options.getInputPath()));

Also, as you can notice, we use withBeamSchemas(true) to create and assign Beam schema to output PCollection. It will allow us to have Schema-aware PCollection and leverage this information in the downstream transform. For sure, we could do this by ourself manually, but AvroIO already encapsulates this logic inside and simplify it from the user perspective.

Don’t confuse Avro Schema with Beam Schema! They are two different entities from different SDKs, though with the similar concept in general. Avro is universal structure format that can be used in different projects and languages for structured data, whenever Beam schema is used mostly as internal format in Beam for Beam pipelines.

Once we have a collection of Avro GenericRecords we can use it as an input for Beam SQL transforms which is a very convenient way to process and analyse data. Normally, Beam SQL works with PCollection of Row's but in this case we don’t need to convert GenericRecord’s into Row’s since Beam will perform it automatically knowing the schema that we set up before (recall using withBeamSchemas(true)). Internally, all Beam SQL transforms will be expanded into normal Beam transforms and profit of all runner optimisations as usually. More details about Beam SQL could be found here.

So, to obtain the number of sales aggregated by brand number we can create and execute quite simple SQL query:

records
   .apply(
       SqlTransform.query(
           "SELECT SUM(sales_number), brand_name FROM PCOLLECTION GROUP BY brand_name"))

And that’s all! Very easy and readable, isn’t it? As an output from this transform we will have a PCollection<Row> that could be written out into files, databases or used in the following chain of other transforms.

Building and running a pipeline

To compile and run this pipeline we need to add two dependencies:
<!-- Beam SDK -->
<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-core</artifactId>
</dependency>

<!-- Beam SQL -->
<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-extensions-sql</artifactId>
</dependency>

Then just build and run it in the same way as we did in previous posts. As usual, all code of this example is published on this github repository.

Happy Beaming!


Comments

Popular posts from this blog

Apache Beam for developers: Creating new IO connectors, part 1

By this post, I'll start a series of blog posts about creating new IO connectors in Apache Beam . Introduction to Beam IO Before getting into Beam IO internals, let's take a quick look on what actually Beam pipeline codebase is. In general, logically all code, that required to run a user pipeline, can be split into 4 layers - Runner , SDK , Transforms & IO , User Code .  On the bottom level, there is a Runner  code, which is responsible for all translations of user pipeline to make it possible to run on preferred data processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc.   On the second level, we have a SDK  code. This part of code allows to write a Beam pipeline in favourite user programming language. For the moment, Beam supports the following SDKs: Java, Python and Go. Scala is supported through 3rd party SDK called Scio .  Third level incorporates different Beam Transforms , like ParDo , GroupByKey , Combine , etc. Also, it includes

Cross-language pipelines in Beam

Java is famous of its paradigm – Write once, Run anywhere – which was defined in 1995 by Sun Microsystems to illustrate the cross-platform benefits of the Java language. Apache Beam follows the similar principle but in regard to cross-platform data processing engines – Write pipeline once and Run it on every data processing engine . Beam achieves that by leveraging a conception of Beam Runner – the programming framework, which is responsible to translate a pipeline, written in Beam model way, into a code that can be run on required processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc. All translations actually happen in runtime – users don’t even need to recompile their code to change a runner if all dependencies were already provided in compile time. Therefore, Beam already supports a bunch of different runners  that can be easily used to run a user’s pipeline on different platforms.  However, the classical runner translates user code only from and to the same SD