package my.org;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.filecache.DistributedCache; // Deprecated in newer versions, but fine for Hadoop 1.x
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class mapsid1036 {
// ---------------- Mapper Class ----------------
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
private ArrayList<String> employees = new ArrayList<String>();
// Setup method loads data from Distributed Cache
@Override
public void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
// Read employee details file from Distributed Cache
try (BufferedReader reader = new BufferedReader(new FileReader(cacheFiles[0].toString()))) {
String line;
while ((line = reader.readLine()) != null) {
employees.add(line);
}
}
}
// Map method performs join operation
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] employeeData = value.toString().split(",");
// Iterate through each employee in cache
for (String employee : employees) {
String[] empDetails = employee.split(",");
// Compare employee IDs
if (employeeData[0].equals(empDetails[0])) {
// Emit joined record: ID, salary, dept, name
context.write(
new Text(employeeData[0]),
new Text(employeeData[1] + "," + employeeData[2] + "," + empDetails[1])
);
}
}
}
}
// ---------------- Main Method ----------------
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Map Side Join using Distributed Cache");
job.setJarByClass(mapsid1036.class);
// Add employee details file to Distributed Cache
DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());
// Set Mapper and output classes
job.setMapperClass(Map.class);
job.setNumReduceTasks(0); // No reducer needed
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Input and output format
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// Input and output paths
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
// Run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Comments
Post a Comment