Skip to main content

Map Reduce

 /*******************************************************************************************

 * BIG DATA LAB – 9

 * Name  : 21MIS1029

 * Topic : MapReduce Programs – Join Operations & Aggregations

 *******************************************************************************************/


/*******************************************************************************************

 * (i) MAP SIDE JOIN

 * ------------------------------------------------------------------------------------------

 * Q1. Develop a Map Reduce Program to display the details of employees using two text files

 *     (sample code is given) using Map Side Join.

 *

 * Input:

 *   empdetails.txt → (emp_id, emp_name)

 *   empdata.txt    → (emp_id, salary, dept_id)

 *

 * Output:

 *   emp_id, salary, dept_id, emp_name

 *******************************************************************************************/

package my.org;


import java.io.*;

import java.util.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.filecache.*; // Deprecated in Hadoop 2.x+

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class mapsid1036 {


    public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        ArrayList<String> employees = new ArrayList<>();


        @Override

        public void setup(Context context) throws IOException, InterruptedException {

            Configuration conf = context.getConfiguration();

            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);


            // Read employee details from distributed cache

            try (BufferedReader reader = new BufferedReader(new FileReader(cacheFiles[0].toString()))) {

                String line;

                while ((line = reader.readLine()) != null) {

                    employees.add(line);

                }

            }

        }


        @Override

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] employeeData = value.toString().split(",");

            for (String employee : employees) {

                String[] empDetails = employee.split(",");

                if (employeeData[0].equals(empDetails[0])) {

                    context.write(new Text(employeeData[0]),

                            new Text(employeeData[1] + "," + employeeData[2] + "," + empDetails[1]));

                }

            }

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "MapSideJoin");

        job.setJarByClass(mapsid1036.class);


        // Add employee file to distributed cache

        DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());


        job.setMapperClass(Map.class);

        job.setNumReduceTasks(0);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);


        FileInputFormat.addInputPath(job, new Path(args[1]));

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


/*******************************************************************************************

 * (ii) REDUCE SIDE JOIN

 * ------------------------------------------------------------------------------------------

 * Q2. Develop a Map Reduce program to count the number of transactions done by customers

 *     using cust.txt and trans.txt file using Reduce Side Join.

 *

 * Input:

 *   cust.txt  → (cust_id, cust_name)

 *   trans.txt → (cust_id, transaction_amount)

 *

 * Output:

 *   cust_name, transaction_count, total_transaction_amount

 *******************************************************************************************/

package my.org;


import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class redjoin1036 {


    public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] line = value.toString().split(",");

            if (line.length >= 2)

                context.write(new Text(line[0]), new Text("cust," + line[1]));

        }

    }


    public static class TransMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] line = value.toString().split(",");

            if (line.length >= 2)

                context.write(new Text(line[0]), new Text("trans," + line[1]));

        }

    }


    public static class JoinReducer extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

            int transactionCount = 0;

            int totalTransactionAmount = 0;

            String customerName = "";


            for (Text val : values) {

                String[] line = val.toString().split(",");

                if (line[0].equals("trans")) {

                    transactionCount++;

                    totalTransactionAmount += Integer.parseInt(line[1]);

                } else if (line[0].equals("cust")) {

                    customerName = line[1];

                }

            }

            context.write(new Text(customerName),

                    new Text(transactionCount + "," + totalTransactionAmount));

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "ReduceSideJoin");

        job.setJarByClass(redjoin1036.class);


        job.setReducerClass(JoinReducer.class);

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TransMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setNumReduceTasks(1);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


/*******************************************************************************************

 * (iii) STUDENT HIGHEST MARK (CAT1 or CAT2)

 * ------------------------------------------------------------------------------------------

 * Q3. Given:

 *   Stud.txt → (std_id, std_name, mobile, address, year_of_admission)

 *   Cat1.txt → (std_id, mark)

 *   Cat2.txt → (std_id, mark)

 *

 *   Students admitted in 2020, 2021, 2022 registered for CSE2035.

 *   Develop a MapReduce program to display stud_id, name, and the highest mark of

 *   corresponding students (either CAT1 or CAT2) for the user-given year.

 *******************************************************************************************/

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class cat1ques1036 {


    public static class StudMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 5) {

                String stdId = tokens[0];

                String yearOfAdmission = tokens[4];

                if (yearOfAdmission.equals("2020") || yearOfAdmission.equals("2021") ||

                        yearOfAdmission.equals("2022")) {

                    context.write(new Text(stdId), new Text("stud," + tokens[1]));

                }

            }

        }

    }


    public static class CatMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 2) {

                context.write(new Text(tokens[0]), new Text("cat," + tokens[1]));

            }

        }

    }


    public static class ReducerClass extends Reducer<Text, Text, Text, IntWritable> {

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

            Map<String, Integer> marks = new HashMap<>();

            for (Text value : values) {

                String[] parts = value.toString().split(",");

                if (parts.length == 2 && parts[0].equals("cat")) {

                    int mark = Integer.parseInt(parts[1]);

                    Integer prev = marks.get("cat");

                    if (prev == null || mark > prev)

                        marks.put("cat", mark);

                }

            }

            Integer maxMark = marks.get("cat");

            context.write(key, new IntWritable(maxMark != null ? maxMark : 0));

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "StudentHighestMark");

        job.setJarByClass(cat1ques1036.class);


        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, StudMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CatMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setReducerClass(ReducerClass.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


/*******************************************************************************************

 * (iv) EMPLOYEE NET PAY CALCULATION

 * ------------------------------------------------------------------------------------------

 * Q4. Given:

 *   Emp.txt → (emp_id, emp_name, dept_name, basic_pay)

 *   Pay.txt → (basic_pay, allowance_percentage, tax_percentage)

 *

 *   Formula: 

 *      net_pay = basic_pay + (allowance% * basic_pay / 100) - (tax% * basic_pay / 100)

 *

 *   Display the count of employees in each department whose net pay > 50000.

 *   Use different reducers for each department.

 *******************************************************************************************/

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;


public class empay1036 {


    // Mapper for Emp.txt

    public static class EmpMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 4) {

                String dept = tokens[2];

                context.write(new Text(dept),

                        new Text("emp," + tokens[0] + "," + tokens[1] + "," + tokens[3]));

            }

        }

    }


    // Mapper for Pay.txt

    public static class PayMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] tokens = value.toString().split(",");

            if (tokens.length >= 3) {

                String dept = tokens[0];

                context.write(new Text(dept),

                        new Text("pay," + tokens[1] + "," + tokens[2]));

            }

        }

    }


    // Reducer

    public static class CalculateNetPayReducer extends Reducer<Text, Text, Text, DoubleWritable> {

        private Map<String, Double> allowanceMap = new HashMap<>();

        private Map<String, Double> taxMap = new HashMap<>();


        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {


            for (Text value : values) {

                String[] parts = value.toString().split(",");

                if (parts[0].equals("pay")) {

                    allowanceMap.put(key.toString(), Double.parseDouble(parts[1]));

                    taxMap.put(key.toString(), Double.parseDouble(parts[2]));

                }

            }


            for (Text value : values) {

                String[] parts = value.toString().split(",");

                if (parts[0].equals("emp")) {

                    double basicPay = Double.parseDouble(parts[3]);

                    double allowance = allowanceMap.getOrDefault(key.toString(), 0.0);

                    double tax = taxMap.getOrDefault(key.toString(), 0.0);

                    double netPay = basicPay + (allowance / 100) * basicPay - (tax / 100) * basicPay;

                    context.write(new Text(parts[1] + "," + parts[2]), new DoubleWritable(netPay));

                }

            }

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "NetPayCalculator");

        job.setJarByClass(empay1036.class);


        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, EmpMapper.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PayMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        job.setReducerClass(CalculateNetPayReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(DoubleWritable.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


Comments

Popular posts from this blog

Reducer Side Join

package my.org; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class redjoin1036 {     // -------------------- CUSTOMER MAPPER --------------------     public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {         public void map(LongWritable key, Text value, Context context)                 throws IOException, InterruptedException {             String[] line = value.toString().split(",");   ...

RDD commands

  Category Command Description / What It Does Example Creation sc.parallelize(data) Create RDD from a Python list rdd = sc.parallelize([1,2,3]) sc.textFile(path) Read text file (each line = 1 element) rdd = sc.textFile("data.txt") sc.wholeTextFiles(path) Read files as (filename, content) pairs rdd = sc.wholeTextFiles("folder/") Basic Transformations map(func) Apply function to each element rdd.map(lambda x: x*2) flatMap(func) Apply function & flatten result rdd.flatMap(lambda x: x.split(" ")) filter(func) Keep elements matching condition rdd.filter(lambda x: x>10) distinct() Remove duplicate elements rdd.distinct() union(rdd2) Combine two RDDs rdd1.union(rdd2) intersection(rdd2) Keep common elements rdd1.intersection(rdd2) subtract(rdd2) Elements in first RDD but not second rdd1.subtract(rdd2) cartesian(rdd2) All pairs between two RDDs rdd1.cartesian(rdd2) sample(withRepla...