In this blog, we compare the engines under the Apache open source framework. Commercial offering also exists as that from Hazelcast Jet. Cloud managed service streaming include IBM Streams, Amazon's Kinesis and Google Dataflow . We hope to bring you more exciting blogs on some these too, in the near future.
What this blog is not about, is a comprehensive tutorial on either of the three Apache technologies. I will assume that the reader is proficient in Java, understands Maven (pom files) and has some experience in using at least one of these three.
Apache Spark has its roots in the Hadoop ecosystem. It was Hadoop on steroids, as it provided a clustered and distributed MapReduce framework. Spark-Core API supports Java, Scala, Python and R. However, Spark-SQL and Spark-Streaming limits its support to Scala, Java and Python.
Apache Flink is a stream processing framework written in Java and Scala. It supports programs written in Java, Python, Scala and SQL. The Python API is however experimental and limited.
Apache Beam is a Batch and Stream processing framework (hence Beam) with API support for Java, Python, Go and SQL. Beam traces its origins from Google Dataflow..
In my opinion, all three engines can perform the task of distributed clustered MapReduce over a Directed Acyclic Graph (DAG) based workflow well. So how does one go about making a choice. Well, this depends on your use case and knowing the subtleties of each engine. Flumaion will be glad to help you unravel this. This blog is a "starter for ten", for free.
Before comparing the three engines, its best to get them to implement the same problem.
The problem we are going to solve is the following equation.
However this is quite boring. Lets make it more interesting by making each element a matrix.
[a] is a 3 x 3 matrix
[b] is a 3 x 2 matrix
[c] is a 2 x 3 matrix
[d] is a 3 x 3 matrix
Finally, to make it even more interesting, data for ten equations was created. So the family of equations was:
i=0,..9, each matrix was therefore keyed by an Integer.
I used Jupyter notes and Numpy to generate the data.
As you can see from the Jupyter code, the array is flattened and written to one line. I struggled to get Spark to read a multi line json and came across a StackOverflow posting which recommended that json be flattened to one json per line.
Reading json, however, required Spark-SQL and resulted in all the data in the one Row inside the DataSet. I therefore abandoned json and went for a plain csv output.
I wanted to use Protobuf's but this stackoverflow post explained the complexity involved. The Parquet dependency seems to echo that throwback to Hadoop. ScalaPB was not an option for me as I am allergic to Scala.
A typical format for a 3 x 3 matrix was as follows.
0,3,3:val00,val10,val20,val01,val11,val12,val02,val12,val22.
0,3,3 - 0 denoted the matrix key index, 3 was the number of rows, 3 was the number of columns.
val00,... - the one-d array was stored in columnar format, because mllib DenseMatrix constructed a matrix from a flat array in that format.
The Spark workflow is outlined here. I ran Spark in local mode because I could not be bothered to set up Yarn(yawn) etc.
The
MatrixAddFunction implements a Spark interface called PairFunction.
The 1st argument is a Tuple2, which is made up of a pair of Integer and Tuple2 - this is the input going in.
The input Tuple2 has two matrices (say [A] and [D]), that belong to the same index.
Hence the signature of Tuple2<Integer,Tuple2<MatrixA,MatrixD>>.
The second and third arguments are Integer and DenseMatrix -
these are the outputs coming from the function.
The output is the index (Integer) that was used in the input and a single matrix that represents [A + D] (DenseMatrix).
You can see these arguments separated clearly in the overridden method called 'call'.
Unfortunately Spark's DenseMatrix does not provide an 'add' method (at least in Java). So I am forced to flatten the array and add them together and return a newly constructed DenseMatrix.
Note the slightly (IMO) wonky syntax to access elements inside the Tuple2 (using '_', yikes!!).
Very similar to MartixAdd, except Spark mllib does provide a useful multiply function.
NOTE: When you group matrices for multiplication, the order of the join is important. If B joins C, you will get a (3x2) x (2x3) = (3x3) matrix. Do it in reverse and you will get a (2x2).
The Flink workflow is outlined here. Again - its run in local mode.
Few differences here are worth taking note of.
The MatrixAddFunction and the MatrixMultFunction is quite similar to that describe before for Spark. Obviously in Flink we extend a MapFunction which is part of the Flink API. Instead of the 'call' method we override a 'map' method. In Flink there is one input argument (like in Spark) and only one output argument. The Interger/DenseMatrix output is bounded into a Tuple2 as one output argument. Essentially, syntactic sugar.
The Beam workflow is outlined here. Again - its run in locally using the DirectRunner.
The
MatrixAddFunction extends DoFn.
The join elements are retrieved from CoGbkResult using the TupleTag.
Once you have extracted the two DenseMatrix, the logic for the rest is identical
to that outlined in the Flink section.
Few differences here are worth taking note of.
The Beam python workflow is outlined here. The main thing to note in python is the absence of TupleTag declarations.
Few differences here are worth taking note of.
In my opinion, Spark seems to be oriented towards Scala and has its roots firmly in the Hadoop Ecosystem. The most frustrating thing I encountered when writing this blog is to work out returning bespoke data structure from my custom functions. Oh, also implementing protobuf's. Did I mention I am allergic to Scala? The antihistamines my GP prescribed had no effect.
The new generation of Streaming engines incorporate the concepts of Windowing including advanced concepts such as Watermarks and late data
In my opinion, it is in these areas that the new engines such as Flink and Beam pull ahead of Spark. Streaming is required when you have unbounded data inputs. Stock market ticks, flight information, GPS locations of your taxis are all examples of unbounded inputs. Bucketing these events and tracking "lateness" is not a trivial exercise and engines such as Flink and Beam provide powerful API's to handle this.