-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathP24_spark.py
43 lines (37 loc) · 1.4 KB
/
P24_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/python
import sys
from pyspark import SparkContext
firstLine = True
def rating(avgRating):
if avgRating <= 1:
return "1"
elif avgRating > 1 and avgRating <= 2:
return "2"
elif avgRating > 2 and avgRating <= 3:
return "3"
elif avgRating > 3 and avgRating <= 4:
return "4"
else:
return "5"
def firstLineFun():
global firstLine
if firstLine:
firstLine = False
return True
else:
return False
sc = SparkContext()
ratingsRDD = sc.textFile("../Data/P14_ratings_data.csv")
moviesRDD = sc.textFile("../Data/P14_movies_data.csv")
ratingsRDD = ratingsRDD.filter(lambda rating: not firstLineFun())
moviesRDD = moviesRDD.filter(lambda movie: not firstLineFun())
ratingsRDD = ratingsRDD.map(lambda rating: rating.split(","))
ratingsRDD = ratingsRDD.map(lambda rating: (rating[1], float(rating[2])))
moviesRDD = moviesRDD.map(lambda movie: movie.split(","))
moviesRatingsRDD = ratingsRDD.join(moviesRDD)
moviesRatingsRDD = moviesRatingsRDD.map(lambda movieRating: (movieRating[1][1], movieRating[1][0]))
moviesRatingsRDD = moviesRatingsRDD.groupByKey()
moviesRatingsRDD = moviesRatingsRDD.map(lambda movieRating: (movieRating[0], sum(list(movieRating[1])) / len(movieRating[1])))
moviesRatingsRDD = moviesRatingsRDD.map(lambda movieRating: (movieRating[0], rating(movieRating[1])))
firstTen = moviesRatingsRDD.take(10)
print firstTen