Apache Spark vs Apache Flink vs Apache Beam

Big Data Analytics Engines

In this blog, we compare the engines under the Apache open source framework. Commercial offering also exists as that from Hazelcast Jet. Cloud managed service streaming include IBM Streams, Amazon's Kinesis and Google Dataflow . We hope to bring you more exciting blogs on some these too, in the near future.

What this blog is not about, is a comprehensive tutorial on either of the three Apache technologies. I will assume that the reader is proficient in Java, understands Maven (pom files) and has some experience in using at least one of these three.

Apache Spark has its roots in the Hadoop ecosystem. It was Hadoop on steroids, as it provided a clustered and distributed MapReduce framework. Spark-Core API supports Java, Scala, Python and R. However, Spark-SQL and Spark-Streaming limits its support to Scala, Java and Python.

Apache Flink is a stream processing framework written in Java and Scala. It supports programs written in Java, Python, Scala and SQL. The Python API is however experimental and limited.

Apache Beam is a Batch and Stream processing framework (hence Beam) with API support for Java, Python, Go and SQL. Beam traces its origins from Google Dataflow..

In my opinion, all three engines can perform the task of distributed clustered MapReduce over a Directed Acyclic Graph (DAG) based workflow well. So how does one go about making a choice. Well, this depends on your use case and knowing the subtleties of each engine. Flumaion will be glad to help you unravel this. This blog is a "starter for ten", for free.

Problem Definition

Before comparing the three engines, its best to get them to implement the same problem.

The problem we are going to solve is the following equation.

However this is quite boring. Lets make it more interesting by making each element a matrix.

[a] is a 3 x 3 matrix
[b] is a 3 x 2 matrix
[c] is a 2 x 3 matrix
[d] is a 3 x 3 matrix

Finally, to make it even more interesting, data for ten equations was created. So the family of equations was:

i=0,..9, each matrix was therefore keyed by an Integer.

Data Generation

I used Jupyter notes and Numpy to generate the data.

As you can see from the Jupyter code, the array is flattened and written to one line. I struggled to get Spark to read a multi line json and came across a StackOverflow posting which recommended that json be flattened to one json per line.

Reading json, however, required Spark-SQL and resulted in all the data in the one Row inside the DataSet. I therefore abandoned json and went for a plain csv output.

I wanted to use Protobuf's but this stackoverflow post explained the complexity involved. The Parquet dependency seems to echo that throwback to Hadoop. ScalaPB was not an option for me as I am allergic to Scala.

A typical format for a 3 x 3 matrix was as follows.

0,3,3:val00,val10,val20,val01,val11,val12,val02,val12,val22.

0,3,3 - 0 denoted the matrix key index, 3 was the number of rows, 3 was the number of columns.

val00,... - the one-d array was stored in columnar format, because mllib DenseMatrix constructed a matrix from a flat array in that format.

Spark

The Spark workflow is outlined here. I ran Spark in local mode because I could not be bothered to set up Yarn(yawn) etc.

  • Section 1 - set up the local run context.
  • Section 2 - set the file path.
  • Section 3 - read in the data from the file as a set of JavaRDD of type String.
  • Section 4 - Use the StringToTupleFunction to convert the string to a JavaPairRDD. Each JavaPairRDD has key as Integer and value as DenseMatrix.
  • Section 5 - Join [A]'s and [D]'s together - it therefore returns a Tuple2 (of DenseMatrix's). Since we need to add [A] + [D], this is done by the bespoke function MatrixAddFunction. Hold the resulting [A+D] matrices in another JavaPairRDD.
  • Section 6 - Same as section 5, but now for [B] and [C]. Here we use the MatrixMultFunction to execute [B] x [C] and hold the results in another JavaPairRDD.
  • Section 7 - Finally, add [A+D] matrices with [B+C] matrices.
  • Section 8 - display the results.

MatrixAddFunction

The MatrixAddFunction implements a Spark interface called PairFunction.
The 1st argument is a Tuple2, which is made up of a pair of Integer and Tuple2 - this is the input going in.
The input Tuple2 has two matrices (say [A] and [D]), that belong to the same index. Hence the signature of Tuple2<Integer,Tuple2<MatrixA,MatrixD>>.
The second and third arguments are Integer and DenseMatrix - these are the outputs coming from the function.
The output is the index (Integer) that was used in the input and a single matrix that represents [A + D] (DenseMatrix).

You can see these arguments separated clearly in the overridden method called 'call'.

Unfortunately Spark's DenseMatrix does not provide an 'add' method (at least in Java). So I am forced to flatten the array and add them together and return a newly constructed DenseMatrix.

Note the slightly (IMO) wonky syntax to access elements inside the Tuple2 (using '_', yikes!!).

MatrixMultFunction

Very similar to MartixAdd, except Spark mllib does provide a useful multiply function.

NOTE: When you group matrices for multiplication, the order of the join is important. If B joins C, you will get a (3x2) x (2x3) = (3x3) matrix. Do it in reverse and you will get a (2x2).

Flink

The Flink workflow is outlined here. Again - its run in local mode.

  • Section 1 - set up the local run context.
  • Section 2 - set the file path.
  • Section 3 - read in the data from the file as a set of DataSet of type String.
  • Section 4 - Use the StringToTupleFunction to convert the string to a DataSet. Each DataSet has key as Integer and value as DenseMatrix, as a Tuple.
  • Section 5 - Join [A]'s and [D]'s together - it therefore returns a Tuple2 (of DenseMatrix's).
    Note the slight difference here from Spark. In Spark, its was a DenseMatrix, DenseMatrix pair. In Flink, it is a (Integer,DenseMatrix) pair.
    Also notice the use of the 'where' clause in the join syntax. Remember Flink's original strengths lie in streaming. It therefore has some really powerful syntax for joining across 'windows' or 'bucket' of events.
  • Section 6 - Same as section 5, but now for [B] and [C]. Here we use the MatrixMultFunction to execute [B] x [C] and hold the results in another JavaPairRDD.
  • Section 7 - Finally, add [A+D] matrices with [B+C] matrices.
  • Section 8 - display the results.

Few differences here are worth taking note of.

  • Instead of a Tuple2 of Integer and DenseMatrix in Section 4, we could have returned our own bespoke Java class which contains (internally) the Integer and the DenseMatrix. I found it much more difficult to achieve this in Spark. It's very straightforward to implement this in Flink and then use it in the join/where/equals clauses. However, I chose not to implement this to keep it consistent.
  • The join function uses a rich SQL-like syntax. In fact, if you search there are examples of attaching bespoke functions that provide very powerful join syntax.
  • Instead of RDD's - we get DataSet's in Flink and the mapping is done using 'map' rather than 'mapToPair' used by Spark
  • Note the change to the access parameters for the Tuple internals in MatrixAddFunction. Field parameters are designated as f0, f1 and so on (a bit more readable, IMO).

The MatrixAddFunction and the MatrixMultFunction is quite similar to that describe before for Spark. Obviously in Flink we extend a MapFunction which is part of the Flink API. Instead of the 'call' method we override a 'map' method. In Flink there is one input argument (like in Spark) and only one output argument. The Interger/DenseMatrix output is bounded into a Tuple2 as one output argument. Essentially, syntactic sugar.

Beam - Java

The Beam workflow is outlined here. Again - its run in locally using the DirectRunner.

  • Section 0 - Something peculiar to Beam Java. When joining two collections (section 4), each elements needs to be tagged using a full blown Java class. I find this cumbersome!
  • Section 1 - set up the pipeline using pipeline options. Here I have defined a bespoke class BeamOptions. I define the path to the data files in this option class.
  • Section 2 - use TextIO to read in the data, a PCollection of String's.
  • Section 3 - Convert each String to a Key/Value pair of Integer, DenseMatrix using the StringToKVFunction. The DenseMatrix is held against the Integer key in a KV object that is native to the Beam API (think of it as a Beam API aware Map).
  • Section 4 - Join the collection of [A] and [D] matrices using CoGroupBy. Using the CoGroupBy results, add [A] and [D] together. The reason for not using 'join' is explained below.
  • Section 5 - Repeat this for [B] and [C]. But apply matrix multiplication instead.
  • Section 6 - Join the [AD] group of matirces with [BC] group. Add them.
  • Section 7 - display the results.

MatrixAddFunction

The MatrixAddFunction extends DoFn.
The join elements are retrieved from CoGbkResult using the TupleTag.
Once you have extracted the two DenseMatrix, the logic for the rest is identical to that outlined in the Flink section.

Few differences here are worth taking note of.

  • Beam java has join extensions but I chose not to use it because I wanted to keep Java and Python implementations consistent with the use of CoGroupBy. These extensions have not yet been extended to Python (which will change when the Portability Framework is in place, extending support for Euphoria Java 8 DSL)
  • Like Flink, we can return custom Java classes from the Functions. I have stuck to KV to be consistent with the other implementations.
  • The use of TupleTag in Beam Java (which frankly is a Java problem, not a Beam problem!).

Beam - Python

The Beam python workflow is outlined here. The main thing to note in python is the absence of TupleTag declarations.

  • Section 1 - set up the pipeline using pipeline options. Here I have defined a bespoke class BeamMainOptions. I define the path to the data files in this option class.
  • Section 2 - use TextIO to read in the data, a PCollection of String's.
    Note the change in syntax. In Java, the '.apply()' was called on the PCollection or the Pipeline. In Python, the unix '|' symbol is used instead.
  • Section 3 - Convert each String to a Key/Value pair of Integer, numpy array using the StringToKVFunction. The numpy array is held against the Integer key in a Python dictionary. Note that the text 'String to A matrices' is important (before the >>). Since we will invoke this ParDo four times (to read four files), the text helps distinguish one ParDo from another. Omitting this string (or using identical strings) would result in an exception.
  • Section 4 - Join the collection of [A] and [D] matrices using CoGroupBy. Using the CoGroupBy results, add [A] and [D] together. The reason for not using 'join' is explained below.

MatrixAddFunction
In Beam Python, as in Beam Java - the MatrixAddFunction extends DoFn. I am getting my numpy array directly from the zeroth index of the CoGroupBy element (cogroupby['firstmatrix'][0]). In my use case I know the CoGroupBy has only one element for that index for that tag. Your use case might not, so you may have to iterate through the collection.

Few differences here are worth taking note of.

  • In Python, the tag supplied to the CoGroupByKey is a string and the same string needs to be used in the DoFn. This is important since there is no strongly typed class as in Java, typos will be your downfall (but I still prefer this).
  • As in Beam-Java and Flink, the DoFn could return its own bespoke Python class. But I stuck to Python dictionaries to remain consistent.
  • Instead of DenseMatrix, in Python these were numpy arrays and so I could leverage numpy's add and multiply(dot) functions.

Conclusions

In my opinion, Spark seems to be oriented towards Scala and has its roots firmly in the Hadoop Ecosystem. The most frustrating thing I encountered when writing this blog is to work out returning bespoke data structure from my custom functions. Oh, also implementing protobuf's. Did I mention I am allergic to Scala? The antihistamines my GP prescribed had no effect.

The new generation of Streaming engines incorporate the concepts of Windowing including advanced concepts such as Watermarks and late data

In my opinion, it is in these areas that the new engines such as Flink and Beam pull ahead of Spark. Streaming is required when you have unbounded data inputs. Stock market ticks, flight information, GPS locations of your taxis are all examples of unbounded inputs. Bucketing these events and tracking "lateness" is not a trivial exercise and engines such as Flink and Beam provide powerful API's to handle this.

Reference Material