This time we are going to talk about one of the most demanded thing in modern BigData world nowadays – processing of Streaming data.
The principal difference between Batching and Streaming is type of input data source. When your data set is limited (even if it’s huge in terms of size) and it is not being updated along the time of processing, then you would likely use Batching pipeline. Input source in this case can be, for instance, files, database tables, objects in object storages, etc. I want to underline one more time that, with batching, we assume that data is immutable during all the processing time and number of input records is constant. Why we should pay attention on this? Because even with files we can have unlimited data stream when files are always added or changed. In such case we have to apply streaming approach to work with data. So, if we know that our data is limited and immutable then we need to develop batching pipeline, like it was showed and explained in the first part of this series.
Things are getting more complicated when our data set is unlimited (continuously arrived) or/and mutable – so we can say that we have a stream of data. Some of the examples of such sources might be the following - delivery messages systems (like Apache Kafka), new files in directory (web server logs) or some other systems of real time data collecting (like IoT sensors). The common thing for all of these sources will be the observation that we always have to wait for new data. Of course, we can split our data into batches (by time or by data size) and process every split in batching way, but it would be quite difficult to apply some functions across all consumed data set and create the whole pipeline for this. Luckily, there are several streaming engines that allow to cope with such type of data processing easily – Apache Spark, Apache Flink, Apache Apex, Google DataFlow. All of them are supported by Apache Beam and we can run the same pipeline on different engines without any code changes. Moreover, we can use the same pipeline in batching or in streaming mode with minimal changes – the one just needs to properly set input source and voilà – everything works out of the box! Isn’t this just a magic? I would dream of this a while ago when I was rewriting my batch jobs into streaming ones.
So, enough theory - it’s time to take an example and write our first streaming code. We are going to read some data from Kafka (unbounded source), perform some simple data processing and write results back to Kafka as well.
Suppose, the task is following – we have unlimited stream of geo coordinates (X and Y) of some objects on map (for example, the objects are cars) which arrives in real time and we want to select only those that are located inside specified rectangle area. In other words, we have to consume text data from Kafka topic, parse it, filter by specified limits and write back into another Kafka topic. Let’s see how we can do this with a help of Apache Beam.
Every Kafka message contains text data in the following format:
id,x,y
where:
id – unique id of object,
x, y - coordinates on map (integers).
We will need to take care of format if it’s not valid and skip such records.
We can elaborate Options object to pass command line options into pipeline. Please, see the whole example on github for more details.
Then, on the next step we have to read data from Kafka input topic. As it was said before, Apache Beam already provides a number of different IO connectors and KafkaIO is one of them. Therefore, we create new unbounded PTransform which consumes arriving messages from specified Kafka topic and propagates them further to next step:
By default, KafkaIO encapsulates all consumed messages into KafkaRecord object. Though, next transform just retrieves a payload (string values) by new created DoFn object:
After, it is time to filter the records (see the initial task stated above) but before we have to parse our string value according to defined format. So, it can be encapsulated into one functional object which then will be used by Beam internal transform Filter.
Then, we have to prepare filtered messages to write back to Kafka by creating new pair of key/values using internal Beam KV class which can be used across different IO connectors, including KafkaIO as well.
The final transform is needed to write messages into Kafka, so we simply use KafkaIO.write() - sink implementation – for these purposes. As for reading, we have to configure this transform with some required options, like Kafka bootstrap servers, output topic name and serialisers for key/value.
In the end, we just run our pipeline as usually:
This time it seems a bit more complicated than it was in previous part, but, as one can easily notice, we didn’t do any specific things to make our pipeline streaming-compatible. This is the whole responsibility of Beam data model implementation which makes it very easy to switch between batching and streaming processing for Beam users.
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Then, just build a jar and run it with DirectRunner to test how it works:
# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"
If it’s needed, we can add other arguments used in pipeline with a help of “exec.args” option. Also, make sure that your Kafka servers are available and properly specified before running Beam pipeline.
Last maven command will launch a pipeline and run it forever until it will be finished manually (optionally, it is possible to specify maximum running time). So, it means that data will be processed continuously, in streaming mode.
As usually, all code of this example is published on this github repository.
Have a good streaming!
The principal difference between Batching and Streaming is type of input data source. When your data set is limited (even if it’s huge in terms of size) and it is not being updated along the time of processing, then you would likely use Batching pipeline. Input source in this case can be, for instance, files, database tables, objects in object storages, etc. I want to underline one more time that, with batching, we assume that data is immutable during all the processing time and number of input records is constant. Why we should pay attention on this? Because even with files we can have unlimited data stream when files are always added or changed. In such case we have to apply streaming approach to work with data. So, if we know that our data is limited and immutable then we need to develop batching pipeline, like it was showed and explained in the first part of this series.
Things are getting more complicated when our data set is unlimited (continuously arrived) or/and mutable – so we can say that we have a stream of data. Some of the examples of such sources might be the following - delivery messages systems (like Apache Kafka), new files in directory (web server logs) or some other systems of real time data collecting (like IoT sensors). The common thing for all of these sources will be the observation that we always have to wait for new data. Of course, we can split our data into batches (by time or by data size) and process every split in batching way, but it would be quite difficult to apply some functions across all consumed data set and create the whole pipeline for this. Luckily, there are several streaming engines that allow to cope with such type of data processing easily – Apache Spark, Apache Flink, Apache Apex, Google DataFlow. All of them are supported by Apache Beam and we can run the same pipeline on different engines without any code changes. Moreover, we can use the same pipeline in batching or in streaming mode with minimal changes – the one just needs to properly set input source and voilà – everything works out of the box! Isn’t this just a magic? I would dream of this a while ago when I was rewriting my batch jobs into streaming ones.
So, enough theory - it’s time to take an example and write our first streaming code. We are going to read some data from Kafka (unbounded source), perform some simple data processing and write results back to Kafka as well.
Suppose, the task is following – we have unlimited stream of geo coordinates (X and Y) of some objects on map (for example, the objects are cars) which arrives in real time and we want to select only those that are located inside specified rectangle area. In other words, we have to consume text data from Kafka topic, parse it, filter by specified limits and write back into another Kafka topic. Let’s see how we can do this with a help of Apache Beam.
Every Kafka message contains text data in the following format:
id,x,y
where:
id – unique id of object,
x, y - coordinates on map (integers).
We will need to take care of format if it’s not valid and skip such records.
Creating a pipeline
First of all, as in previous part, where we did batching processing, we create a pipeline in the same way:Pipeline pipeline = Pipeline.create(options);
We can elaborate Options object to pass command line options into pipeline. Please, see the whole example on github for more details.
Then, on the next step we have to read data from Kafka input topic. As it was said before, Apache Beam already provides a number of different IO connectors and KafkaIO is one of them. Therefore, we create new unbounded PTransform which consumes arriving messages from specified Kafka topic and propagates them further to next step:
pipeline.apply( KafkaIO.<Long, String>read() .withBootstrapServers(options.getBootstrap()) .withTopic(options.getInputTopic()) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class))
By default, KafkaIO encapsulates all consumed messages into KafkaRecord object. Though, next transform just retrieves a payload (string values) by new created DoFn object:
.apply( ParDo.of( new DoFn<KafkaRecord<Long, String>, String>() { @ProcessElement public void processElement(ProcessContext processContext) { KafkaRecord<Long, String> record = processContext.element(); processContext.output(record.getKV().getValue()); } } ) )
After, it is time to filter the records (see the initial task stated above) but before we have to parse our string value according to defined format. So, it can be encapsulated into one functional object which then will be used by Beam internal transform Filter.
.apply( "FilterValidCoords", Filter.by(new FilterObjectsByCoordinates( options.getCoordX(), options.getCoordY())) )
Then, we have to prepare filtered messages to write back to Kafka by creating new pair of key/values using internal Beam KV class which can be used across different IO connectors, including KafkaIO as well.
.apply( "ExtractPayload", ParDo.of( new DoFn<String, KV<String, String>>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(KV.of("filtered", c.element())); } } ) )
The final transform is needed to write messages into Kafka, so we simply use KafkaIO.write() - sink implementation – for these purposes. As for reading, we have to configure this transform with some required options, like Kafka bootstrap servers, output topic name and serialisers for key/value.
.apply( "WriteToKafka", KafkaIO.<String, String>write() .withBootstrapServers(options.getBootstrap()) .withTopic(options.getOutputTopic()) .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class) );
In the end, we just run our pipeline as usually:
pipeline.run();
This time it seems a bit more complicated than it was in previous part, but, as one can easily notice, we didn’t do any specific things to make our pipeline streaming-compatible. This is the whole responsibility of Beam data model implementation which makes it very easy to switch between batching and streaming processing for Beam users.
Building and running a pipeline
Let’s add the required dependencies to make it possible to use Beam KafkaIO:<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Then, just build a jar and run it with DirectRunner to test how it works:
# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"
If it’s needed, we can add other arguments used in pipeline with a help of “exec.args” option. Also, make sure that your Kafka servers are available and properly specified before running Beam pipeline.
Last maven command will launch a pipeline and run it forever until it will be finished manually (optionally, it is possible to specify maximum running time). So, it means that data will be processed continuously, in streaming mode.
As usually, all code of this example is published on this github repository.
Have a good streaming!
Comments
Post a Comment