tf-quant-finance: Bundle processing using DoFn
with (beam.Pipeline(options=options)) as p:
    p | \
    "MatchFiles" >> beam.Create([options.gcstradesource]) | \
    "ReadAllFromText" >>
    textio.ReadAllFromText(desired_bundle_size=int(options.readbndlsz)) | \
    "Parse json document and emit trades" >> beam.Map(parse_json) | \
    "Price in batches" >>
    beam.ParDo(CalculatePriceInBatchesDoFn(int(options.dofnbndlsz))) | \
    ...

class CalculatePriceInBatchesDoFn(beam.DoFn)::
    def __init__(...
        ...
    def setup(self):
        ...
    def start_bundle(self):
        ...
    def process(self, element):
        k, v = element
        self.buffer.append(v)
        if len(self.buffer) > self.dofnbundlesize:
        results = process_batch(self.buffer)
        for result in results:
            yield result
        self.buffer = []
        self.counter += 1

    def finish_bundle(self):
        if self.buffer:
        results = process_batch(self.buffer)
        for result in results:
            yield beam.utils.windowed_value.WindowedValue(
               value=result,
               timestamp=0,
               windows=[self.window],
            )
        self.buffer = []
        self.counter += 1

  1. Read all files by wildcard - get the 50 million lines
  2. Batch the lines (say 20 * 1024 * 1024, so about three batches)
  3. Parse the json from each line, in this batch
  4. Pass the PCollection of json to CalculatePriceInBatchesDoFn in batches
  5. Process this batch of json, convert to array and price