package my.org;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class redjoin1036 {
// -------------------- CUSTOMER MAPPER --------------------
public static class CustMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(",");
// Ensure valid input
if (line.length >= 2) {
// key = customer ID, value = "cust,customerName"
context.write(new Text(line[0]), new Text("cust," + line[1]));
}
}
}
// -------------------- TRANSACTION MAPPER --------------------
public static class TransMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(",");
// Ensure valid input
if (line.length >= 2) {
// key = customer ID, value = "trans,transactionAmount"
context.write(new Text(line[0]), new Text("trans," + line[1]));
}
}
}
// -------------------- REDUCER --------------------
public static class JoinReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int transactionCount = 0;
int totalTransactionAmount = 0;
String customerName = "";
for (Text val : values) {
String[] parts = val.toString().split(",");
if (parts[0].equals("trans")) {
// Count transactions and accumulate total
transactionCount++;
totalTransactionAmount += Integer.parseInt(parts[1]);
} else if (parts[0].equals("cust")) {
// Store customer name
customerName = parts[1];
}
}
// Output: customerName -> transactionCount,totalTransactionAmount
context.write(new Text(customerName),
new Text(transactionCount + "," + totalTransactionAmount));
}
}
// -------------------- DRIVER CODE --------------------
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: redjoin1036 <input-customer> <input-transaction> <output>");
System.exit(1);
}
Configuration conf = new Configuration();
Job job = new Job(conf, "Reduce Side Join - Customer Transactions");
job.setJarByClass(redjoin1036.class);
job.setReducerClass(JoinReducer.class);
// Multiple input paths for customer and transaction files
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TransMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Comments
Post a Comment