How to Develop & Execute Apache Beam Data Pipeline on Local Machine

  • Post last modified:July 16, 2022
  • Reading time:5 mins read

Download Template Project

  • Lets download template code for apache beam . below command would create directory for word-count-beam example , on top of which we can write our own pipeline logic.
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.beam \
    -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
    -DarchetypeVersion=2.39.0 \
    -DgroupId=org.example \
    -DartifactId=word-count-beam \
    -Dversion="0.1" \
    -Dpackage=org.apache.beam.examples \
    -DinteractiveMode=false

Coding Pipeline

  • Create LocalFileReader.java File & Add main method that will be entry for your code
  • Add Options Interface that take runtime parameters for you pipeline.
  • finally code your pipeline as per your business logic.

Main Method

  • Our main method consit of initializing options and then creating pipeline object.
  • Once we have options and pipeline ready we can pass it to execute function which takes care of business logic for data pipeline.
  • And finally we can run our pipeline
    public static void main(String[] args) {

        // configure options from args
        TextToTextOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(TextToTextOptions.class);

        // create pipeline object
        Pipeline pipeline = Pipeline.create(options);
        
        // write execution logic
        execute(options,pipeline);
        
        // execute pipeline. FYI , without this pipeline will not start as Beam does lazy loading.
        pipeline.run();
    }

Runtime Options

  • Our pipeline can take runtime parameters . All we have to do is to define then in Options interface like TextToTextOptions interface that we have defined here.
    // define options for your pipeline. kind of runtime parameters that you want to pass
    public interface TextToTextOptions extends PipelineOptions {

        @Description("InputFile Path Required")
        ValueProvider<String> getInputFilePath();
        void setInputFilePath(ValueProvider<String> value);

        @Description("InputFile Path Required")
        ValueProvider<String> getOutputFilePath();
        void setOutputFilePath(ValueProvider<String> value);

    }

Execution Logic

Reading input text file

  • We can use Apache beam provided textio connector to read file. we can pass local directory to read file from. It will read all the content parellely as PCollection<String>. PCollection<String> is immutable container for data that can be split up across n number of machine when parellel processing is required. It’s kind of RDD in apache spark world.

Processing each record

  • Now as next step , we can write our logic for how to handle each record.The boilerplate template for apache beam processing would look like somthing below. it has lifecylce methods such as @Setup, @ProcessElement.
  • I am using OpenCSV to parse each string as csv record. i will initialize csvParser object in setup so that it can be shared across machines.
  • Then inside ProcessElement i would write my logic for handling each record. here i am reading input as csv record and validating its length. additionally i am also checking if the restaurants name present in the file which kind of very important for me to allow quality data to pass to the next step.
  • If i encounter any error in validation then i am logging as error . but as good coder i should always setup alert notifcation based on this log for my operator to check the cause of error and later fix the issue or take necessary steps.
  • At the end for the records that pass the validation gates, i am just ouputing to result . and this entire logic would result me PCollection<String> and this result will contain only those records that passed the validation

Writing to Ouput

  • Now we can write our output result to the destination. In our case it is text file. We should also mention number of shards so that we get that many files. it helps when we have big data to write to n number of output parallel.We can also mention extension as suffix.

Final Execution Logic

  • Lets put entire execute logic together .
 private static void execute(TextToTextOptions options, Pipeline pipeline){
        // read file as PCollection<String>
        PCollection<String> InputRecords = pipeline.apply(TextIO.read().from(options.getInputFilePath()));
        // process file for each record parallel and execute logic as needed
        InputRecords.apply(ParDo.of(new DoFn<String, String>() {
            private int totalExpectedFields = 11;
            private CSVParser csvParser;

            // use OpenCSV to read String as csv
            @Setup
            public void initOpenCsv() throws IOException {
                csvParser = new CSVParser();
            }
            @ProcessElement
            public void process(ProcessContext ctx){
                try {
                    String element = ctx.element();
                    String[] fields = csvParser.parseLine(element); // parse string as csv record
                    // LOG.info("len : "+fields.length);
                    // length doesn't match
                    if (fields.length != totalExpectedFields) {
                        throw new Exception("Error processing record, length doesn't match." + fields[0]);
                    }
                    // validate restaurant id if present and if its integer
                    int restaurantId = Integer.parseInt(fields[0]);

                    //check if restaurant name is present
                    String restaurantName = fields[2];
                    if(restaurantName.isEmpty() || restaurantName==null){
                        throw new Exception("Error processing record, restaurant_name is not present");
                    }

                    // collect validated record as ouput
                    ctx.output(element);
                }catch(Exception ex){
                    LOG.error(ex.getMessage());
                }

            }
        }))// write record to output file
                .apply(TextIO.write().to(options.getOutputFilePath()).withSuffix(".csv").withNumShards(1));// "/Users/surajmishra/Documents/dataset/archive/restaurants_validated.csv"

    }

Executing Pipeline

  • Lets use maven exec command to run our data pipeline code on local machine. Notice that we are using local-runner for execution and not dataflow or spark runner. we are also passing input file and output file as runtime parameter for our pipeline.
mvn -Pdataflow-runner compile exec:java \
     -Dexec.mainClass=LocalFileReader \
     -Pdirect-runner \
     -Dexec.args="--inputFilePath={dir}/restaurants.csv \
                 --outputFilePath={dir}/datasets/output/restaurants_validated.csv"

This Post Has One Comment

  1. Twicsy

    I have to thank you for the efforts you’ve put in writing this site.
    I am hoping to see the same high-grade blog posts by you later
    on as well. In fact, your creative writing abilities has motivated me to get my own, personal site now 😉

Leave a Reply