tf-quant-finance: Group by Keys
with (beam.Pipeline(options=options)) as p:
    trades_by_batch = p | "Read lines" >>
    textio.ReadFromText(options.source, min_bundle_size = 1024*1024) | \
    'Parse json document and emit trades' >> \
    beam.Map(trade_by_key) | "Group by batch" >> beam.GroupByKey()

def trade_by_key(line):
    tradedict = orjson.loads(line)
    batch_key = random.randint(0,16*20)
    ...

  1. We read (about) a million lines at a time
  2. Pass each line to a function called 'trade_by_key'
  3. Each trade is assigned an int as key (in this case between 0 and 320)
  4. trades assigned to the same keys are grouped (50m/320 is 156250)
  5. convert this group of json trades to numpy arrays (in this case of size 156250)
  6. price using vanilla_prices.py