Skip to main content

Cross-language pipelines in Beam

Java is famous of its paradigm – Write once, Run anywhere – which was defined in 1995 by Sun Microsystems to illustrate the cross-platform benefits of the Java language. Apache Beam follows the similar principle but in regard to cross-platform data processing engines – Write pipeline once and Run it on every data processing engine. Beam achieves that by leveraging a conception of Beam Runner – the programming framework, which is responsible to translate a pipeline, written in Beam model way, into a code that can be run on required processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc. All translations actually happen in runtime – users don’t even need to recompile their code to change a runner if all dependencies were already provided in compile time. Therefore, Beam already supports a bunch of different runners that can be easily used to run a user’s pipeline on different platforms. 


However, the classical runner translates user code only from and to the same SDK – from Java to Java, from Python to Python, from Go to Go. That is why, if we want, for example, to use Spark Runner (that allows to run Beam pipeline on Spark), written in Java, we have to use only Beam Java SDK. If we want to run a Beam Python pipeline on Spark, then we had to implement either Spark Runner written in Python - which is interesting but not trivial task - or… 


That was in the past until Beam community started to work on Portability – a quite new Beam feature that is supposed to eliminate such restrictions and reuse already written code in different SDKs. That being said, users will be able to run a pipeline, written in Python or in Go, with Spark Runner written in Java. And even more! It will be possible to run cross-language pipelines – for example, Java pipeline with one or several Beam transforms written in Python or in Go! The goal of this post is to give a brief explanation how Beam Portability works and show how to run cross-language pipelines like these. As an example, we will run Java/Python pipeline on Apache Spark.


Beam Portability and Cross-Language pipeline

Let’s talk about Beam Portability in few words. If you need more details, I’d recommend watching these great talks: 


As it was said above, the “classical” Beam runner translates a user pipeline code to the code of processing engine that will be used later to run the pipeline and user has to use the same language SDK for the pipeline, runner and engine.  


The main goal of Beam Portability and Portable Runner is to eliminate this restriction and use the components based on different SDK in the same pipeline.  In that way, it will be possible to reuse the Beam transforms of different SDKs (e.g. IO connectors or Beam SQL) and create cross-language pipelines. 


Let’s take a brief look on how it works. While using Portable Runner, SDK will translate the pipeline into Protobuf representation via Runner API. Then, it will upload all required dependencies from class path (in case of Java SDK) or specified by user location(s) to Runner side (for example Spark Runner). After that, pipeline is submitted to the Job Server via Job API and, finally, Runner translates and runs the pipeline on data processing engine (Spark in our case).


Actually, Job Server is a new Beam component (it’s a daemon that should be run separately), which is responsible to receive a pipeline in portable way and run it on specified runner.  


Though, it’s still not clear how to execute a code of different SDKs with a runner that is written only in one SDK – like Spark Runner is written in Java. For this purpose, Beam uses a conception of SDK Harness – this is a separate component of Beam Portability which is responsible to execute a small part of pipeline (PTransform) with dedicated SDK. Beam supports two main ways of SDK Harness – Docker and Process based. It means that either a Docker container or a process with required Beam SDK will be launched to execute a code of transform.


And finally, for cross-language pipeline, we need to run and specify Expansion Service for every transform. By default, it will use the Expansion Service that is running along with Job Server – so we don’t need to specify for transforms with the same SDK as Job Server. Though, for other SDKs we need to run a standalone Expansion Service and specify it for every external transform. So, the final architecture of Cross-Language pipeline will look like this:



In the next part we will talk about how to run such portable pipelines with Portable Spark Runner.


Comments

Popular posts from this blog

Apache Beam for developers: Creating new IO connectors, part 1

By this post, I'll start a series of blog posts about creating new IO connectors in Apache Beam . Introduction to Beam IO Before getting into Beam IO internals, let's take a quick look on what actually Beam pipeline codebase is. In general, logically all code, that required to run a user pipeline, can be split into 4 layers - Runner , SDK , Transforms & IO , User Code .  On the bottom level, there is a Runner  code, which is responsible for all translations of user pipeline to make it possible to run on preferred data processing engine, like Apache Spark, Apache Flink, Google Dataflow, etc.   On the second level, we have a SDK  code. This part of code allows to write a Beam pipeline in favourite user programming language. For the moment, Beam supports the following SDKs: Java, Python and Go. Scala is supported through 3rd party SDK called Scio .  Third level incorporates different Beam Transforms , like ParDo , GroupByKey , Combine , etc. Also, it includes

Developing data processing job using Apache Beam - Avro schema and schema-aware PCollections

Avro in Beam Before, we always talked about PCollections that were not aware about the structure of their records. For example, in this post we have had to parse text files manually, extract the fields and perform GroupByKey operation based on this. This is inconvenient and error-prone way of working with structured data. It can be done sufficiently easier and in more clear and elegant way if we had schema-based records as an input data collection. For the moment, there are many different data structure formats “on the market” - JSON, Avro, Parquet, Protocol Buffers, etc. - that could be used for structuring our data in more fancy way than just plain text. Apache Beam provides a good support of Avro format , so we will see in this post how to work efficiently with it. First of all, there is an AvroIO , which allows to work with files, written in Avro format, in Beam pipelines. Under the hood, it uses FileIO , so it means that we can read/write files from/to different File Systems