Avro in Beam
Before, we always talked about PCollections that were not aware about the structure of their records. For example, in this post we have had to parse text files manually, extract the fields and perform GroupByKey operation based on this. This is inconvenient and error-prone way of working with structured data. It can be done sufficiently easier and in more clear and elegant way if we had schema-based records as an input data collection. For the moment, there are many different data structure formats “on the market” - JSON, Avro, Parquet, Protocol Buffers, etc. - that could be used for structuring our data in more fancy way than just plain text. Apache Beam provides a good support of Avro format, so we will see in this post how to work efficiently with it.First of all, there is an AvroIO, which allows to work with files, written in Avro format, in Beam pipelines. Under the hood, it uses FileIO, so it means that we can read/write files from/to different File Systems, specify the file patterns for input paths and use other features provided by FileIO. Also, AvroIO may return as output PCollection of Avro generated classes or GenericRecord’s, that can be used downstream (we will see this in example later).
The second thing is that Beam already provides different tools to work with Avro schema, like AvroUtils and AvroCoder, that allow to convert Avro records into Beam rows (Row) back and forward and use Avro schema for Beam coders instantiation.
What is a Beam Row? In two words, this is a type of generic row with schema (this time it’s a Beam schema, not Avro!) that can be used in Beam SQL transforms. For more details see official documentation.
With this basic information about Avro support in Beam, we can already start to modify the pipeline from this blog post and leverage Avro and Beam schemas to process our data with Beam SQL transforms.
From plain text to Avro Schema records
Let’s recall what was the initial task there. As an input data, we have text logs of sold cars in the following format:id,brand_name,model_name,sales_number
For example:
1,Renault,Scenic,3
2,Peugeot,307,2
1,Renault,Megane,4
Our task is to analyse how many cars of each brand were sold for the whole period of time of observations. So, in the first turn, let’s create an Avro schema which will reflect our log structure:
{
"namespace": "sales.avro",
"type": "record",
"name": "AvroGeneratedCarsSales",
"fields": [
{"name": "id", "type": "int"},
{"name": "brand_name", "type": "string"},
{"name": "model_name", "type": "string"},
{"name": "sales_number", "type": "int"}
]
}
Then, we suppose that all data that we read is consisted of the records created using this schema. So, to leverage this schema we need to create new Avro Schema object:
Schema SCHEMA = new Schema.Parser().parse(SCHEMA_STRING);
After, we pass this Schema and input path, specified by CLI option where all our data is located, into AvroIO that returns PCollection of GenericRecords in its turn.
PCollection<GenericRecord> records = pipeline.apply( AvroIO.readGenericRecords(SCHEMA). withBeamSchemas(true).from(options.getInputPath()));
Also, as you can notice, we use withBeamSchemas(true) to create and assign Beam schema to output PCollection. It will allow us to have Schema-aware PCollection and leverage this information in the downstream transform. For sure, we could do this by ourself manually, but AvroIO already encapsulates this logic inside and simplify it from the user perspective.
Don’t confuse Avro Schema with Beam Schema! They are two different entities from different SDKs, though with the similar concept in general. Avro is universal structure format that can be used in different projects and languages for structured data, whenever Beam schema is used mostly as internal format in Beam for Beam pipelines.
Once we have a collection of Avro GenericRecords we can use it as an input for Beam SQL transforms which is a very convenient way to process and analyse data. Normally, Beam SQL works with PCollection of Row's but in this case we don’t need to convert GenericRecord’s into Row’s since Beam will perform it automatically knowing the schema that we set up before (recall using withBeamSchemas(true)). Internally, all Beam SQL transforms will be expanded into normal Beam transforms and profit of all runner optimisations as usually. More details about Beam SQL could be found here.
So, to obtain the number of sales aggregated by brand number we can create and execute quite simple SQL query:
records .apply( SqlTransform.query( "SELECT SUM(sales_number), brand_name FROM PCOLLECTION GROUP BY brand_name"))
And that’s all! Very easy and readable, isn’t it? As an output from this transform we will have a PCollection<Row> that could be written out into files, databases or used in the following chain of other transforms.
Building and running a pipeline
To compile and run this pipeline we need to add two dependencies:<!-- Beam SDK -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
<!-- Beam SQL -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-sql</artifactId>
</dependency>
Then just build and run it in the same way as we did in previous posts. As usual, all code of this example is published on this github repository.
Happy Beaming!
Comments
Post a Comment