Exercise Spark with RDD
In this exercise, using the movielens datasets, let's find all the movies with the lowest average rating, meaning the bad movies.
Upload files in DBFS
Let's start by uploading our datasets in DBFS, the Databricks filesytem.
Go to the Data section, click on the default database and then click on the + (plus) sign at the top to create a new table.
For the Data source, select "Upload file". Create a directory movielens in /FileStore/tables/ by typing "movielens" in the Upload to DBFS field:
Click on "Drop file or click here to upload". Browse to the u.data file.
Do NOT create a table. Perform the same steps for the u.item file (for the Upload to DBFS part, you can click on Select and select the movielens directory).
Exercise Spark RDD
Once the files have been uploaded, check the path were the file has been saved in DBFS (e.g. "/FileStore/tables/movielens/u.item") and udpdate the code below if needed.
#List the path of the u.item and u.data
#display(dbutils.fs.ls("/FileStore/tables"))
display(dbutils.fs.ls("/FileStore/tables/movielens"))
Now that our files are available in DBFS, let's create their corresponding RDDs using the textFile() method:
# Load u.item data
#rawItem = sc.textFile("<FILL IN WITH U.ITEM PATH IN DBFS/u.item>")
rawItem = sc.textFile("/FileStore/tables/movielens/u.item")
# Load u.data data
#rawData = sc.textFile("<FILL IN WITH U.DATA PATH IN DBFS/u.data>")
rawData = sc.textFile("/FileStore/tables/movielens/u.data")
For now, our RDDs are just lines of our initial text files (e.g. check: print(rawData.collect()) ). We will use Python's split() function that we saw earlier to separate each field. We need to provide the split() function with the file delimiter to do so.
Next we need to extract the fields we will use to solve our problem.
- u.data is tab ("\t") delimited and we need to extract movie_id (index 1) and rating (index 2). Additionally, to compute the average rating per movie, we want a tuple of the form (rating, 1.0) per movie_id
- u.item is pipe ("|") delimited and we need to extract movie_id (index 0) and title (index 1).
To summarize, you will need to
- split the lines
- extract/map the fields we want to a new RDD (do NOT use flatmap() ).
# From rawData, create a new RDD of the form: (movie_id, (rating, 1.0))
movieRatings = rawData.<FILL IN WITH YOUR CODE HERE>
# From rawItem, create a new RDD of the form: (movie_id, title)
movieList = rawItem.<FILL IN WITH YOUR CODE HERE>
Next step will be to sum the ratings and count the number of ratings per movie_id so we will be able to compute the average rating per movie. We saw a function that gathers together pairs that have the same key and applies the function provided to two values at a time. We might want to use such function to sum/count values sharing the same key.
# From movieRatings, create a new RDD of the form: (movie_id (sumOfRatings, totalRatings))
ratingTotalsAndCount = movieRatings.<FILL IN WITH YOUR CODE HERE>
Once we have our sum of ratings and our total ratings per movie_id, we can easily compute the average rating by dividing the sum of ratings by the total number of ratings. Since we want to access elements within a tuple to compute the average rating per movie, we can use the array notation, e.g. x[n] to access the nth element of a tuple x.
As a concrete example, if we have a RDD of the form (x,y) with x being the key and y being a tuple of the form (a,b), we can access a using y[0] and b using y[1].
Hint: We can map only the VALUES from our RDD, but this is not mandatory.
# From ratingTotalsAndCount, create a new RDD of the form: (movieID, AverageRating)
averageRatings = ratingTotalsAndCount.<FILL IN WITH YOUR CODE HERE>
We are almost done! We can now sort by average rating using the sortBy() function. Check how this function works in the pyspark API documentation:
https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sortBy
#Sort by average Rating
sortedMovies = averageRatings.<FILL IN WITH YOUR CODE HERE>
In our sortedMovies RDD, we are missing the title for each movie_id. This field is in the movieList RDD (form: (movie_id, title) ).
We can join two RDDs using the join function: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.join
The result RDD will be of the form: (k, (v1, v2)), where (k, v1) and (k, v2) are the two joining RDDs and k is the matching key.
# Join averageRatings with movieList. The result RDD will be of the form: (movie_id,(averageRating, title))
joinRDD = <FILL IN WITH YOUR CODE HERE>
Finally, let's sort by average rating and keep the movie title and the average rating only.
# Keep title and average rating only and sort by average Rating. The result RDD should have the form: (averaRating, title)
results = joinRDD.<FILL IN WITH YOUR CODE HERE>
results.collect()
Bonus exercise Spark RDD
Our solution for finding the lowest-rated movies is polluted with movies rated by one or two people.
Modify our previous solution to only consider movies with at least ten ratings.
HINTS:
We saw that RDD's have a function to filter records. It takes a function as a parameter, which accepts the entire key/value pair. This function should be an expression that returns True if the row should be kept, or False if it should be discarded.
If you have an RDD that contains (movie_id, (sumOfRatings, totalRatings)), a lambda function that takes in "x" would refer to totalRatings as x[1][1]. x[1] gives us the "value" of (sumOfRatings, totalRatings) and x[1][1] pulls out totalRatings.