Quantlib methodology

The Quantlib methodology was much simpler as no Group By's or creation of numpy arrays are involved.
with (beam.Pipeline(options=self.__options)) as p:
    p | "Read lines" >> \
    textio.ReadFromText(self.__options.gcstradesource, min_bundle_size = 1024*1024) | \
    'Price trade and emit result' >> \
    beam.ParDo(QLBlackScholesLinePrice()) | \
    "Convert results to string for output" >> beam.Map(lambda x: str(x)) | \
    "Write results to file" >> beam.io.WriteToText(self.__options.output)


class QLBlackScholesLinePrice(beam.DoFn):
    def process(self, element):
        json_trade = orjson.loads(element)
        calendar = ql.UnitedStates()
        trade_id = json_trade['trade_id']
        ....
        riskFreeCurve = ql.FlatForward(0, calendar ...
        volatility = ql.BlackConstantVol(0, calendar, ...
        process = ql.BlackScholesProcess(ql.QuoteHandle(u), ...
        engine = ql.AnalyticEuropeanEngine(process)
        option.setPricingEngine(engine)
        yield trade_id, option.NPV()

  1. Read all the data
  2. distribute the PCollection of strings to a DoFn called QLBlackScholesLinePrice
  3. write the results