Skip to main content

Reducer Side Join

package my.org;


import java.io.IOException;


import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class redjoin1036 {


    // -------------------- CUSTOMER MAPPER --------------------

    public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {


            String[] line = value.toString().split(",");


            // Ensure valid input

            if (line.length >= 2) {

                // key = customer ID, value = "cust,customerName"

                context.write(new Text(line[0]), new Text("cust," + line[1]));

            }

        }

    }


    // -------------------- TRANSACTION MAPPER --------------------

    public static class TransMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {


            String[] line = value.toString().split(",");


            // Ensure valid input

            if (line.length >= 2) {

                // key = customer ID, value = "trans,transactionAmount"

                context.write(new Text(line[0]), new Text("trans," + line[1]));

            }

        }

    }


    // -------------------- REDUCER --------------------

    public static class JoinReducer extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {


            int transactionCount = 0;

            int totalTransactionAmount = 0;

            String customerName = "";


            for (Text val : values) {

                String[] parts = val.toString().split(",");


                if (parts[0].equals("trans")) {

                    // Count transactions and accumulate total

                    transactionCount++;

                    totalTransactionAmount += Integer.parseInt(parts[1]);

                } else if (parts[0].equals("cust")) {

                    // Store customer name

                    customerName = parts[1];

                }

            }


            // Output: customerName -> transactionCount,totalTransactionAmount

            context.write(new Text(customerName),

                    new Text(transactionCount + "," + totalTransactionAmount));

        }

    }


    // -------------------- DRIVER CODE --------------------

    public static void main(String[] args) throws Exception {


        if (args.length < 3) {

            System.err.println("Usage: redjoin1036 <input-customer> <input-transaction> <output>");

            System.exit(1);

        }


        Configuration conf = new Configuration();

        Job job = new Job(conf, "Reduce Side Join - Customer Transactions");


        job.setJarByClass(redjoin1036.class);

        job.setReducerClass(JoinReducer.class);


        // Multiple input paths for customer and transaction files

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TransMapper.class);


        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setNumReduceTasks(1);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

Comments

Popular posts from this blog

Map Reduce

 /*******************************************************************************************  * BIG DATA LAB – 9  * Name  : 21MIS1029  * Topic : MapReduce Programs – Join Operations & Aggregations  *******************************************************************************************/ /*******************************************************************************************  * (i) MAP SIDE JOIN  * ------------------------------------------------------------------------------------------  * Q1. Develop a Map Reduce Program to display the details of employees using two text files  *     (sample code is given) using Map Side Join.  *  * Input:  *   empdetails.txt → (emp_id, emp_name)  *   empdata.txt    → (emp_id, salary, dept_id)  *  * Output:  *   emp_id, salary, dept_id, emp_name  ****************************************************...

Map Side Join

package my.org; import java.io.*; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.*; import org.apache.hadoop.filecache.DistributedCache;   // Deprecated in newer versions, but fine for Hadoop 1.x import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class mapsid1036 {     // ---------------- Mapper Class ----------------     public static class Map extends Mapper<LongWritable, Text, Text, Text> {         private ArrayList<String> employees = new ArrayList<String>();         // Setup method loads data from Distributed Cache         @Override         public void setup(Con...