How to Group Apache Beam records into Batches

  • Post last modified:July 16, 2022
  • Reading time:3 mins read

Overview

  • Apache Beam provides immutable PCollection to store read input data. If we read input file as string , the read input data will be stored into
    PCollection <String>.
  • It’s not uncommon to have interaction with external API through the REST interface, where we have to send a bunch of records to service and expect some output.
  • If we read n number of records parallelly and call the external API parallelly, we will overwhelm the external API service and might hit the API limit rate. This is a very important use case where we need to batch input records.
  • In this article, we will see how to batch input records so that we send batches of records to the rest endpoint and respect the API limit.

Use Case

  • The very generic use case would be to read sensitive data from the input files and send it to some dlp service to mask it before we can write to our data warehouse.

Solution

  • If we go through apache beam documentation we will come across a function called GroupIntoBatches.
  • This function basically takes KV pair and group records based on key.
  • But when we read files, we just read it as PCollection<String> and not PCollection<String, String> or PCollection<Integer, String > which we can then pass to GroupIntoBatches function.
  • We need to think of a solution to assign some keys to our input records. But we cannot just assign a UUID to each record, because then each record will have its own UUID and GroupIntoBatches will not be able to group them into more than 1 record based on a unique key.
@Process
public void process(ProcessContext ctx){
  String element = ctx.element();
  KV<Integer, String> KeyedIput = KV.of(random.nextInt(ShardNumber), element); // add shardNumber
  ctx.output(KeyedIput);
}
  • We can use random class to generate random integer but we also have to pass shard number so that random integer can only generate data within the upper limit so that we can have common keys.
  • once we have input records with keys, now we can supply that to GroupIntoBatches method and it will group our input records based on the keys that we have provided.
PCollection<KV<String, Iterable<String>>> batchedRecord = keyedInput.apply(GroupIntoBatches.<String, String>ofSize(batchSize));
// now we can make external api call by passing list of records as batched input.
  • once our records are batched now we can pass these batched records in a single API call to external services such as dlp API to mask our sensitive data and return masked data which we can write back to our Data warehouse.

Conclusion

  • When making external API calls, it’s a good idea to batch input records and pass them instead of making n number of API calls for each record.
  • By doing this we can save a lot of API calls and potentially will not hit API limits.

Leave a Reply