/*******************************************************************************************
* BIG DATA LAB – 9
* Name : 21MIS1029
* Topic : MapReduce Programs – Join Operations & Aggregations
*******************************************************************************************/
/*******************************************************************************************
* (i) MAP SIDE JOIN
* ------------------------------------------------------------------------------------------
* Q1. Develop a Map Reduce Program to display the details of employees using two text files
* (sample code is given) using Map Side Join.
*
* Input:
* empdetails.txt → (emp_id, emp_name)
* empdata.txt → (emp_id, salary, dept_id)
*
* Output:
* emp_id, salary, dept_id, emp_name
*******************************************************************************************/
package my.org;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.filecache.*; // Deprecated in Hadoop 2.x+
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class mapsid1036 {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
ArrayList<String> employees = new ArrayList<>();
@Override
public void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
// Read employee details from distributed cache
try (BufferedReader reader = new BufferedReader(new FileReader(cacheFiles[0].toString()))) {
String line;
while ((line = reader.readLine()) != null) {
employees.add(line);
}
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] employeeData = value.toString().split(",");
for (String employee : employees) {
String[] empDetails = employee.split(",");
if (employeeData[0].equals(empDetails[0])) {
context.write(new Text(employeeData[0]),
new Text(employeeData[1] + "," + employeeData[2] + "," + empDetails[1]));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "MapSideJoin");
job.setJarByClass(mapsid1036.class);
// Add employee file to distributed cache
DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
/*******************************************************************************************
* (ii) REDUCE SIDE JOIN
* ------------------------------------------------------------------------------------------
* Q2. Develop a Map Reduce program to count the number of transactions done by customers
* using cust.txt and trans.txt file using Reduce Side Join.
*
* Input:
* cust.txt → (cust_id, cust_name)
* trans.txt → (cust_id, transaction_amount)
*
* Output:
* cust_name, transaction_count, total_transaction_amount
*******************************************************************************************/
package my.org;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class redjoin1036 {
public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(",");
if (line.length >= 2)
context.write(new Text(line[0]), new Text("cust," + line[1]));
}
}
public static class TransMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(",");
if (line.length >= 2)
context.write(new Text(line[0]), new Text("trans," + line[1]));
}
}
public static class JoinReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int transactionCount = 0;
int totalTransactionAmount = 0;
String customerName = "";
for (Text val : values) {
String[] line = val.toString().split(",");
if (line[0].equals("trans")) {
transactionCount++;
totalTransactionAmount += Integer.parseInt(line[1]);
} else if (line[0].equals("cust")) {
customerName = line[1];
}
}
context.write(new Text(customerName),
new Text(transactionCount + "," + totalTransactionAmount));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "ReduceSideJoin");
job.setJarByClass(redjoin1036.class);
job.setReducerClass(JoinReducer.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TransMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
/*******************************************************************************************
* (iii) STUDENT HIGHEST MARK (CAT1 or CAT2)
* ------------------------------------------------------------------------------------------
* Q3. Given:
* Stud.txt → (std_id, std_name, mobile, address, year_of_admission)
* Cat1.txt → (std_id, mark)
* Cat2.txt → (std_id, mark)
*
* Students admitted in 2020, 2021, 2022 registered for CSE2035.
* Develop a MapReduce program to display stud_id, name, and the highest mark of
* corresponding students (either CAT1 or CAT2) for the user-given year.
*******************************************************************************************/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class cat1ques1036 {
public static class StudMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
if (tokens.length >= 5) {
String stdId = tokens[0];
String yearOfAdmission = tokens[4];
if (yearOfAdmission.equals("2020") || yearOfAdmission.equals("2021") ||
yearOfAdmission.equals("2022")) {
context.write(new Text(stdId), new Text("stud," + tokens[1]));
}
}
}
}
public static class CatMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
if (tokens.length >= 2) {
context.write(new Text(tokens[0]), new Text("cat," + tokens[1]));
}
}
}
public static class ReducerClass extends Reducer<Text, Text, Text, IntWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Integer> marks = new HashMap<>();
for (Text value : values) {
String[] parts = value.toString().split(",");
if (parts.length == 2 && parts[0].equals("cat")) {
int mark = Integer.parseInt(parts[1]);
Integer prev = marks.get("cat");
if (prev == null || mark > prev)
marks.put("cat", mark);
}
}
Integer maxMark = marks.get("cat");
context.write(key, new IntWritable(maxMark != null ? maxMark : 0));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "StudentHighestMark");
job.setJarByClass(cat1ques1036.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, StudMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CatMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setReducerClass(ReducerClass.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
/*******************************************************************************************
* (iv) EMPLOYEE NET PAY CALCULATION
* ------------------------------------------------------------------------------------------
* Q4. Given:
* Emp.txt → (emp_id, emp_name, dept_name, basic_pay)
* Pay.txt → (basic_pay, allowance_percentage, tax_percentage)
*
* Formula:
* net_pay = basic_pay + (allowance% * basic_pay / 100) - (tax% * basic_pay / 100)
*
* Display the count of employees in each department whose net pay > 50000.
* Use different reducers for each department.
*******************************************************************************************/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class empay1036 {
// Mapper for Emp.txt
public static class EmpMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
if (tokens.length >= 4) {
String dept = tokens[2];
context.write(new Text(dept),
new Text("emp," + tokens[0] + "," + tokens[1] + "," + tokens[3]));
}
}
}
// Mapper for Pay.txt
public static class PayMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
if (tokens.length >= 3) {
String dept = tokens[0];
context.write(new Text(dept),
new Text("pay," + tokens[1] + "," + tokens[2]));
}
}
}
// Reducer
public static class CalculateNetPayReducer extends Reducer<Text, Text, Text, DoubleWritable> {
private Map<String, Double> allowanceMap = new HashMap<>();
private Map<String, Double> taxMap = new HashMap<>();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
String[] parts = value.toString().split(",");
if (parts[0].equals("pay")) {
allowanceMap.put(key.toString(), Double.parseDouble(parts[1]));
taxMap.put(key.toString(), Double.parseDouble(parts[2]));
}
}
for (Text value : values) {
String[] parts = value.toString().split(",");
if (parts[0].equals("emp")) {
double basicPay = Double.parseDouble(parts[3]);
double allowance = allowanceMap.getOrDefault(key.toString(), 0.0);
double tax = taxMap.getOrDefault(key.toString(), 0.0);
double netPay = basicPay + (allowance / 100) * basicPay - (tax / 100) * basicPay;
context.write(new Text(parts[1] + "," + parts[2]), new DoubleWritable(netPay));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "NetPayCalculator");
job.setJarByClass(empay1036.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, EmpMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PayMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setReducerClass(CalculateNetPayReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Comments
Post a Comment