Skip to main content

Map Side Join

package my.org;


import java.io.*;

import java.util.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.*;

import org.apache.hadoop.filecache.DistributedCache;   // Deprecated in newer versions, but fine for Hadoop 1.x

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


public class mapsid1036 {


    // ---------------- Mapper Class ----------------

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        private ArrayList<String> employees = new ArrayList<String>();


        // Setup method loads data from Distributed Cache

        @Override

        public void setup(Context context) throws IOException, InterruptedException {

            Configuration conf = context.getConfiguration();

            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);


            // Read employee details file from Distributed Cache

            try (BufferedReader reader = new BufferedReader(new FileReader(cacheFiles[0].toString()))) {

                String line;

                while ((line = reader.readLine()) != null) {

                    employees.add(line);

                }

            }

        }


        // Map method performs join operation

        @Override

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {


            String[] employeeData = value.toString().split(",");


            // Iterate through each employee in cache

            for (String employee : employees) {

                String[] empDetails = employee.split(",");


                // Compare employee IDs

                if (employeeData[0].equals(empDetails[0])) {

                    // Emit joined record: ID, salary, dept, name

                    context.write(

                        new Text(employeeData[0]),

                        new Text(employeeData[1] + "," + employeeData[2] + "," + empDetails[1])

                    );

                }

            }

        }

    }


    // ---------------- Main Method ----------------

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "Map Side Join using Distributed Cache");

        job.setJarByClass(mapsid1036.class);


        // Add employee details file to Distributed Cache

        DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());


        // Set Mapper and output classes

        job.setMapperClass(Map.class);

        job.setNumReduceTasks(0); // No reducer needed

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        // Input and output format

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);


        // Input and output paths

        FileInputFormat.addInputPath(job, new Path(args[1]));

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        // Run the job

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


Comments

Popular posts from this blog

Reducer Side Join

package my.org; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class redjoin1036 {     // -------------------- CUSTOMER MAPPER --------------------     public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {         public void map(LongWritable key, Text value, Context context)                 throws IOException, InterruptedException {             String[] line = value.toString().split(",");   ...

Map Reduce

 /*******************************************************************************************  * BIG DATA LAB – 9  * Name  : 21MIS1029  * Topic : MapReduce Programs – Join Operations & Aggregations  *******************************************************************************************/ /*******************************************************************************************  * (i) MAP SIDE JOIN  * ------------------------------------------------------------------------------------------  * Q1. Develop a Map Reduce Program to display the details of employees using two text files  *     (sample code is given) using Map Side Join.  *  * Input:  *   empdetails.txt → (emp_id, emp_name)  *   empdata.txt    → (emp_id, salary, dept_id)  *  * Output:  *   emp_id, salary, dept_id, emp_name  ****************************************************...