-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEstimation2.java
129 lines (106 loc) · 4.52 KB
/
Estimation2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package finalproject_try;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class Estimation2 {
//user:movie,predicted_rating
public static class ValMapper1 extends Mapper<Object, Text, Text, Text>{
Text outkey = new Text();
Text outvalue = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException{
String[] line=value.toString().split(",");
String[] pair = line[0].split(":");
String user = pair[0];
String movie = pair[1];
String predicted_rating = line[1];
outkey.set(user+":"+movie);
outvalue.set("P"+predicted_rating);
context.write(outkey, outvalue);
}
}
//user,movie1:rating1+movie2:rating2,movie3:rating3+movie4:rating4
public static class ValMapper2 extends Mapper<Object, Text, Text, Text>{
Text outkey = new Text();
Text outvalue = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException{
String[] line=value.toString().split(",");
String user = line[0];
System.out.println("ValMapper2 is called here !!!");
System.out.println(user);
String[] predictArr = line[1].split("\\+");
for(int i = 0; i < predictArr.length; i++){
String[] pair = predictArr[i].split(":");
String movie = pair[0];
String rating = pair[1];
outkey.set(user+":"+movie);
outvalue.set("A"+rating);
context.write(outkey, outvalue);
}
}
}
public static class ValReducer extends Reducer<Text,Text,Text,Text> {
//user:movie,[predicted_rating,actual_rating]
//key: number_of_user value:sum_of_error
Text outvalue = new Text();
int num_of_correct = 0;
int num_of_wrong = 0;
public void reduce(Text key, Iterable<Text> values,Context context
) throws IOException, InterruptedException {
int predict_rating = 0;
int actual_rating = 0;
for(Text val : values){
String tmp = val.toString();
if(tmp.charAt(0) == 'P'){
float temp_pred = Float.parseFloat(tmp.substring(1,tmp.length()));
predict_rating = Math.round(temp_pred);
}else{
actual_rating = Integer.parseInt(tmp.substring(1,tmp.length()));
}
}
if(Math.abs(predict_rating-actual_rating) <= 1){
num_of_correct++;
}else{
num_of_wrong++;
}
}
public void cleanup (Context context) throws IOException,InterruptedException {
outvalue.set(String.valueOf(num_of_correct)+","+String.valueOf(num_of_wrong));
context.write(null, outvalue);
}
}
public static class ValPartitioner extends HashPartitioner<Text, Text>{
@Override
public int getPartition(Text key, Text result, int numReduceTasks) {
int num = key.toString().hashCode();
return (num&Integer.MAX_VALUE)%numReduceTasks;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "estimation");
job.setJarByClass(Estimation2.class);
MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,ValMapper1.class);
MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,ValMapper1.class);
MultipleInputs.addInputPath(job,new Path(args[2]),TextInputFormat.class,ValMapper1.class);
MultipleInputs.addInputPath(job,new Path(args[3]),TextInputFormat.class,ValMapper1.class);
MultipleInputs.addInputPath(job,new Path(args[4]),TextInputFormat.class,ValMapper1.class);
MultipleInputs.addInputPath(job,new Path(args[5]),TextInputFormat.class,ValMapper2.class);
job.setPartitionerClass(ValPartitioner.class);
job.setNumReduceTasks(5);
job.setReducerClass(ValReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[6]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}