/**
* Section 0
**/
public static final TupleTag<DenseMatrix> firstmatTag = new TupleTag<>();
public static final TupleTag<DenseMatrix> secondmatTag = new TupleTag<>();
/**
* Section 1
**/
BeamOptions options = PipelineOptionsFactory.as(BeamOptions.class);
Pipeline pipeline = Pipeline.create(options);
...
/**
* Section 2
**/
PCollection<String> alines = pipeline.apply("Read A matrices", TextIO.read().from(options.getAFile()));
...
/**
* Section 3
**/
PCollection<KV<Integer, DenseMatrix>> aMatrices = alines.apply(ParDo.of(new StringToKVFunction()));
...
/**
* Section 4
**/
PCollection<KV<Integer, CoGbkResult>> groupingADMatrices = KeyedPCollectionTuple
       .of(firstmatTag,aMatrices)
       .and(secondmatTag,dMatrices)
       .apply(CoGroupByKey.create());
PCollection<KV<Integer,DenseMatrix>> adMatrices = groupingADMatrices
       .apply(ParDo.of(new MatrixAddFunction()));
...
/**
* Section 5
**/
... apply(ParDo.of(new MatrixMultFunction()));
...
/**
* Section 6
**/
...
... apply(ParDo.of(new MatrixAddFunction()));
...
/**
* Section 7
**/
resultMatrices.apply(ParDo.of(new PrintResultsFunction()));
pipeline.run().waitUntilFinish();