Skip to main content

Reducer Side Join

package my.org;


import java.io.IOException;


import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class redjoin1036 {


    // -------------------- CUSTOMER MAPPER --------------------

    public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {


            String[] line = value.toString().split(",");


            // Ensure valid input

            if (line.length >= 2) {

                // key = customer ID, value = "cust,customerName"

                context.write(new Text(line[0]), new Text("cust," + line[1]));

            }

        }

    }


    // -------------------- TRANSACTION MAPPER --------------------

    public static class TransMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {


            String[] line = value.toString().split(",");


            // Ensure valid input

            if (line.length >= 2) {

                // key = customer ID, value = "trans,transactionAmount"

                context.write(new Text(line[0]), new Text("trans," + line[1]));

            }

        }

    }


    // -------------------- REDUCER --------------------

    public static class JoinReducer extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {


            int transactionCount = 0;

            int totalTransactionAmount = 0;

            String customerName = "";


            for (Text val : values) {

                String[] parts = val.toString().split(",");


                if (parts[0].equals("trans")) {

                    // Count transactions and accumulate total

                    transactionCount++;

                    totalTransactionAmount += Integer.parseInt(parts[1]);

                } else if (parts[0].equals("cust")) {

                    // Store customer name

                    customerName = parts[1];

                }

            }


            // Output: customerName -> transactionCount,totalTransactionAmount

            context.write(new Text(customerName),

                    new Text(transactionCount + "," + totalTransactionAmount));

        }

    }


    // -------------------- DRIVER CODE --------------------

    public static void main(String[] args) throws Exception {


        if (args.length < 3) {

            System.err.println("Usage: redjoin1036 <input-customer> <input-transaction> <output>");

            System.exit(1);

        }


        Configuration conf = new Configuration();

        Job job = new Job(conf, "Reduce Side Join - Customer Transactions");


        job.setJarByClass(redjoin1036.class);

        job.setReducerClass(JoinReducer.class);


        // Multiple input paths for customer and transaction files

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TransMapper.class);


        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setNumReduceTasks(1);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

Comments

Popular posts from this blog

Map Reduce

 /*******************************************************************************************  * BIG DATA LAB – 9  * Name  : 21MIS1029  * Topic : MapReduce Programs – Join Operations & Aggregations  *******************************************************************************************/ /*******************************************************************************************  * (i) MAP SIDE JOIN  * ------------------------------------------------------------------------------------------  * Q1. Develop a Map Reduce Program to display the details of employees using two text files  *     (sample code is given) using Map Side Join.  *  * Input:  *   empdetails.txt → (emp_id, emp_name)  *   empdata.txt    → (emp_id, salary, dept_id)  *  * Output:  *   emp_id, salary, dept_id, emp_name  ****************************************************...

RDD commands

  Category Command Description / What It Does Example Creation sc.parallelize(data) Create RDD from a Python list rdd = sc.parallelize([1,2,3]) sc.textFile(path) Read text file (each line = 1 element) rdd = sc.textFile("data.txt") sc.wholeTextFiles(path) Read files as (filename, content) pairs rdd = sc.wholeTextFiles("folder/") Basic Transformations map(func) Apply function to each element rdd.map(lambda x: x*2) flatMap(func) Apply function & flatten result rdd.flatMap(lambda x: x.split(" ")) filter(func) Keep elements matching condition rdd.filter(lambda x: x>10) distinct() Remove duplicate elements rdd.distinct() union(rdd2) Combine two RDDs rdd1.union(rdd2) intersection(rdd2) Keep common elements rdd1.intersection(rdd2) subtract(rdd2) Elements in first RDD but not second rdd1.subtract(rdd2) cartesian(rdd2) All pairs between two RDDs rdd1.cartesian(rdd2) sample(withRepla...