Skip to main content

Map Side Join

package my.org;


import java.io.*;

import java.util.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.*;

import org.apache.hadoop.filecache.DistributedCache;   // Deprecated in newer versions, but fine for Hadoop 1.x

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


public class mapsid1036 {


    // ---------------- Mapper Class ----------------

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        private ArrayList<String> employees = new ArrayList<String>();


        // Setup method loads data from Distributed Cache

        @Override

        public void setup(Context context) throws IOException, InterruptedException {

            Configuration conf = context.getConfiguration();

            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);


            // Read employee details file from Distributed Cache

            try (BufferedReader reader = new BufferedReader(new FileReader(cacheFiles[0].toString()))) {

                String line;

                while ((line = reader.readLine()) != null) {

                    employees.add(line);

                }

            }

        }


        // Map method performs join operation

        @Override

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {


            String[] employeeData = value.toString().split(",");


            // Iterate through each employee in cache

            for (String employee : employees) {

                String[] empDetails = employee.split(",");


                // Compare employee IDs

                if (employeeData[0].equals(empDetails[0])) {

                    // Emit joined record: ID, salary, dept, name

                    context.write(

                        new Text(employeeData[0]),

                        new Text(employeeData[1] + "," + employeeData[2] + "," + empDetails[1])

                    );

                }

            }

        }

    }


    // ---------------- Main Method ----------------

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "Map Side Join using Distributed Cache");

        job.setJarByClass(mapsid1036.class);


        // Add employee details file to Distributed Cache

        DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());


        // Set Mapper and output classes

        job.setMapperClass(Map.class);

        job.setNumReduceTasks(0); // No reducer needed

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        // Input and output format

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);


        // Input and output paths

        FileInputFormat.addInputPath(job, new Path(args[1]));

        FileOutputFormat.setOutputPath(job, new Path(args[2]));


        // Run the job

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


Comments

Popular posts from this blog

Reducer Side Join

package my.org; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class redjoin1036 {     // -------------------- CUSTOMER MAPPER --------------------     public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {         public void map(LongWritable key, Text value, Context context)                 throws IOException, InterruptedException {             String[] line = value.toString().split(",");   ...

Map Reduce

 /*******************************************************************************************  * BIG DATA LAB – 9  * Name  : 21MIS1029  * Topic : MapReduce Programs – Join Operations & Aggregations  *******************************************************************************************/ /*******************************************************************************************  * (i) MAP SIDE JOIN  * ------------------------------------------------------------------------------------------  * Q1. Develop a Map Reduce Program to display the details of employees using two text files  *     (sample code is given) using Map Side Join.  *  * Input:  *   empdetails.txt → (emp_id, emp_name)  *   empdata.txt    → (emp_id, salary, dept_id)  *  * Output:  *   emp_id, salary, dept_id, emp_name  ****************************************************...

RDD commands

  Category Command Description / What It Does Example Creation sc.parallelize(data) Create RDD from a Python list rdd = sc.parallelize([1,2,3]) sc.textFile(path) Read text file (each line = 1 element) rdd = sc.textFile("data.txt") sc.wholeTextFiles(path) Read files as (filename, content) pairs rdd = sc.wholeTextFiles("folder/") Basic Transformations map(func) Apply function to each element rdd.map(lambda x: x*2) flatMap(func) Apply function & flatten result rdd.flatMap(lambda x: x.split(" ")) filter(func) Keep elements matching condition rdd.filter(lambda x: x>10) distinct() Remove duplicate elements rdd.distinct() union(rdd2) Combine two RDDs rdd1.union(rdd2) intersection(rdd2) Keep common elements rdd1.intersection(rdd2) subtract(rdd2) Elements in first RDD but not second rdd1.subtract(rdd2) cartesian(rdd2) All pairs between two RDDs rdd1.cartesian(rdd2) sample(withRepla...