Skip to main content

Map Reduce

 /*******************************************************************************************

 * BIG DATA LAB – 9

 * Name  : 21MIS1029

 * Topic : MapReduce Programs – Join Operations & Aggregations

 *******************************************************************************************/


/*******************************************************************************************

 * (i) MAP SIDE JOIN

 * ------------------------------------------------------------------------------------------

 * Q1. Develop a Map Reduce Program to display the details of employees using two text files

 *     (sample code is given) using Map Side Join.

 *

 * Input:

 *   empdetails.txt → (emp_id, emp_name)

 *   empdata.txt    → (emp_id, salary, dept_id)

 *

 * Output:

 *   emp_id, salary, dept_id, emp_name

 *******************************************************************************************/

package my.org;


import java.io.*;

import java.util.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.filecache.*; // Deprecated in Hadoop 2.x+

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class mapsid1036 {


    public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        ArrayList<String> employees = new ArrayList<>();


        @Override

        public void setup(Context context) throws IOException, InterruptedException {

            Configuration conf = context.getConfiguration();

            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);


            // Read employee details from distributed cache

            try (BufferedReader reader = new BufferedReader(new FileReader(cacheFiles[0].toString()))) {

                String line;

                while ((line = reader.readLine()) != null) {

                    employees.add(line);

                }

            }

        }


        @Override

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] employeeData = value.toString().split(",");

            for (String employee : employees) {

                String[] empDetails = employee.split(",");

                if (employeeData[0].equals(empDetails[0])) {

                    context.write(new Text(employeeData[0]),

                            new Text(employeeData[1] + "," + employeeData[2] + "," + empDetails[1]));

                }

            }

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "MapSideJoin");

        job.setJarByClass(mapsid1036.class);


        // Add employee file to distributed cache

        DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());


        job.setMapperClass(Map.class);

        job.setNumReduceTasks(0);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);


        FileInputFormat.addInputPath(job, new Path(args[1]));

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


/*******************************************************************************************

 * (ii) REDUCE SIDE JOIN

 * ------------------------------------------------------------------------------------------

 * Q2. Develop a Map Reduce program to count the number of transactions done by customers

 *     using cust.txt and trans.txt file using Reduce Side Join.

 *

 * Input:

 *   cust.txt  → (cust_id, cust_name)

 *   trans.txt → (cust_id, transaction_amount)

 *

 * Output:

 *   cust_name, transaction_count, total_transaction_amount

 *******************************************************************************************/

package my.org;


import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class redjoin1036 {


    public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] line = value.toString().split(",");

            if (line.length >= 2)

                context.write(new Text(line[0]), new Text("cust," + line[1]));

        }

    }


    public static class TransMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] line = value.toString().split(",");

            if (line.length >= 2)

                context.write(new Text(line[0]), new Text("trans," + line[1]));

        }

    }


    public static class JoinReducer extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

            int transactionCount = 0;

            int totalTransactionAmount = 0;

            String customerName = "";


            for (Text val : values) {

                String[] line = val.toString().split(",");

                if (line[0].equals("trans")) {

                    transactionCount++;

                    totalTransactionAmount += Integer.parseInt(line[1]);

                } else if (line[0].equals("cust")) {

                    customerName = line[1];

                }

            }

            context.write(new Text(customerName),

                    new Text(transactionCount + "," + totalTransactionAmount));

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "ReduceSideJoin");

        job.setJarByClass(redjoin1036.class);


        job.setReducerClass(JoinReducer.class);

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TransMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setNumReduceTasks(1);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


/*******************************************************************************************

 * (iii) STUDENT HIGHEST MARK (CAT1 or CAT2)

 * ------------------------------------------------------------------------------------------

 * Q3. Given:

 *   Stud.txt → (std_id, std_name, mobile, address, year_of_admission)

 *   Cat1.txt → (std_id, mark)

 *   Cat2.txt → (std_id, mark)

 *

 *   Students admitted in 2020, 2021, 2022 registered for CSE2035.

 *   Develop a MapReduce program to display stud_id, name, and the highest mark of

 *   corresponding students (either CAT1 or CAT2) for the user-given year.

 *******************************************************************************************/

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class cat1ques1036 {


    public static class StudMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 5) {

                String stdId = tokens[0];

                String yearOfAdmission = tokens[4];

                if (yearOfAdmission.equals("2020") || yearOfAdmission.equals("2021") ||

                        yearOfAdmission.equals("2022")) {

                    context.write(new Text(stdId), new Text("stud," + tokens[1]));

                }

            }

        }

    }


    public static class CatMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 2) {

                context.write(new Text(tokens[0]), new Text("cat," + tokens[1]));

            }

        }

    }


    public static class ReducerClass extends Reducer<Text, Text, Text, IntWritable> {

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

            Map<String, Integer> marks = new HashMap<>();

            for (Text value : values) {

                String[] parts = value.toString().split(",");

                if (parts.length == 2 && parts[0].equals("cat")) {

                    int mark = Integer.parseInt(parts[1]);

                    Integer prev = marks.get("cat");

                    if (prev == null || mark > prev)

                        marks.put("cat", mark);

                }

            }

            Integer maxMark = marks.get("cat");

            context.write(key, new IntWritable(maxMark != null ? maxMark : 0));

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "StudentHighestMark");

        job.setJarByClass(cat1ques1036.class);


        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, StudMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CatMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setReducerClass(ReducerClass.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


/*******************************************************************************************

 * (iv) EMPLOYEE NET PAY CALCULATION

 * ------------------------------------------------------------------------------------------

 * Q4. Given:

 *   Emp.txt → (emp_id, emp_name, dept_name, basic_pay)

 *   Pay.txt → (basic_pay, allowance_percentage, tax_percentage)

 *

 *   Formula: 

 *      net_pay = basic_pay + (allowance% * basic_pay / 100) - (tax% * basic_pay / 100)

 *

 *   Display the count of employees in each department whose net pay > 50000.

 *   Use different reducers for each department.

 *******************************************************************************************/

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class empay1036 {


    // Mapper for Emp.txt

    public static class EmpMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 4) {

                String dept = tokens[2];

                context.write(new Text(dept),

                        new Text("emp," + tokens[0] + "," + tokens[1] + "," + tokens[3]));

            }

        }

    }


    // Mapper for Pay.txt

    public static class PayMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 3) {

                String dept = tokens[0];

                context.write(new Text(dept),

                        new Text("pay," + tokens[1] + "," + tokens[2]));

            }

        }

    }


    // Reducer

    public static class CalculateNetPayReducer extends Reducer<Text, Text, Text, DoubleWritable> {

        private Map<String, Double> allowanceMap = new HashMap<>();

        private Map<String, Double> taxMap = new HashMap<>();


        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {


            for (Text value : values) {

                String[] parts = value.toString().split(",");

                if (parts[0].equals("pay")) {

                    allowanceMap.put(key.toString(), Double.parseDouble(parts[1]));

                    taxMap.put(key.toString(), Double.parseDouble(parts[2]));

                }

            }


            for (Text value : values) {

                String[] parts = value.toString().split(",");

                if (parts[0].equals("emp")) {

                    double basicPay = Double.parseDouble(parts[3]);

                    double allowance = allowanceMap.getOrDefault(key.toString(), 0.0);

                    double tax = taxMap.getOrDefault(key.toString(), 0.0);

                    double netPay = basicPay + (allowance / 100) * basicPay - (tax / 100) * basicPay;

                    context.write(new Text(parts[1] + "," + parts[2]), new DoubleWritable(netPay));

                }

            }

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "NetPayCalculator");

        job.setJarByClass(empay1036.class);


        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, EmpMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PayMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setReducerClass(CalculateNetPayReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(DoubleWritable.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


Comments

Popular posts from this blog

Reducer Side Join

package my.org; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class redjoin1036 {     // -------------------- CUSTOMER MAPPER --------------------     public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {         public void map(LongWritable key, Text value, Context context)                 throws IOException, InterruptedException {             String[] line = value.toString().split(",");   ...

Map Side Join

package my.org; import java.io.*; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.*; import org.apache.hadoop.filecache.DistributedCache;   // Deprecated in newer versions, but fine for Hadoop 1.x import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class mapsid1036 {     // ---------------- Mapper Class ----------------     public static class Map extends Mapper<LongWritable, Text, Text, Text> {         private ArrayList<String> employees = new ArrayList<String>();         // Setup method loads data from Distributed Cache         @Override         public void setup(Con...