Apache Beam provides immutable PCollection to store read input data. If we read input file as string , the read input data will be stored into PCollection <String>.
It’s not uncommon to have interaction with external API through the REST interface, where we have to send a bunch of records to service and expect some output.
If we read n number of records parallelly and call the external API parallelly, we will overwhelm the external API service and might hit the API limit rate. This is a very important use case where we need to batch input records.
In this article, we will see how to batch input records so that we send batches of records to the rest endpoint and respect the API limit.
Use Case
The very generic use case would be to read sensitive data from the input files and send it to some dlp service to mask it before we can write to our data warehouse.
Solution
If we go through apache beam documentation we will come across a function called GroupIntoBatches.
This function basically takes KV pair and group records based on key.
But when we read files, we just read it as PCollection<String> and not PCollection<String, String> or PCollection<Integer, String > which we can then pass to GroupIntoBatches function.
We need to think of a solution to assign some keys to our input records. But we cannot just assign a UUID to each record, because then each record will have its own UUID and GroupIntoBatches will not be able to group them into more than 1 record based on a unique key.
@Process
public void process(ProcessContext ctx){
String element = ctx.element();
KV<Integer, String> KeyedIput = KV.of(random.nextInt(ShardNumber), element); // add shardNumber
ctx.output(KeyedIput);
}
We can use random class to generate random integer but we also have to pass shard number so that random integer can only generate data within the upper limit so that we can have common keys.
once we have input records with keys, now we can supply that to GroupIntoBatches method and it will group our input records based on the keys that we have provided.
PCollection<KV<String, Iterable<String>>> batchedRecord = keyedInput.apply(GroupIntoBatches.<String, String>ofSize(batchSize));
// now we can make external api call by passing list of records as batched input.
once our records are batched now we can pass these batched records in a single API call to external services such as dlp API to mask our sensitive data and return masked data which we can write back to our Data warehouse.
Conclusion
When making external API calls, it’s a good idea to batch input records and pass them instead of making n number of API calls for each record.
By doing this we can save a lot of API calls and potentially will not hit API limits.
We use cookies on our website to give you the most relevant experience by remembering your preferences and repeat visits. By clicking “Accept”, you consent to the use of ALL the cookies.
This website uses cookies to improve your experience while you navigate through the website. Out of these, the cookies that are categorized as necessary are stored on your browser as they are essential for the working of basic functionalities of the website. We also use third-party cookies that help us analyze and understand how you use this website. These cookies will be stored in your browser only with your consent. You also have the option to opt-out of these cookies. But opting out of some of these cookies may affect your browsing experience.
Necessary cookies are absolutely essential for the website to function properly. These cookies ensure basic functionalities and security features of the website, anonymously.
Cookie
Duration
Description
cookielawinfo-checkbox-analytics
11 months
This cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Analytics".
cookielawinfo-checkbox-functional
11 months
The cookie is set by GDPR cookie consent to record the user consent for the cookies in the category "Functional".
cookielawinfo-checkbox-necessary
11 months
This cookie is set by GDPR Cookie Consent plugin. The cookies is used to store the user consent for the cookies in the category "Necessary".
cookielawinfo-checkbox-others
11 months
This cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Other.
cookielawinfo-checkbox-performance
11 months
This cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Performance".
viewed_cookie_policy
11 months
The cookie is set by the GDPR Cookie Consent plugin and is used to store whether or not user has consented to the use of cookies. It does not store any personal data.
Functional cookies help to perform certain functionalities like sharing the content of the website on social media platforms, collect feedbacks, and other third-party features.
Performance cookies are used to understand and analyze the key performance indexes of the website which helps in delivering a better user experience for the visitors.
Analytical cookies are used to understand how visitors interact with the website. These cookies help provide information on metrics the number of visitors, bounce rate, traffic source, etc.
Advertisement cookies are used to provide visitors with relevant ads and marketing campaigns. These cookies track visitors across websites and collect information to provide customized ads.