Spark算子实战应用
数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase
相关数据文件 :
users.dat ---UserID::Gender::Age::Occupation::Zip-code
movies.dat --- MovieID::Title::Genres
ratings.dat ---UserID::MovieID::Rating::Timestamp
SogouQ.mini
完成以下业务需求:
1. 年龄段在“18-24”的男性年轻人,最喜欢看哪10部
2.得分最高的10部电影;看过电影最多的前10个人;女性看多最多的10部电影;男性看过最多 的10部电影
3.利用数据集SogouQ2012.mini.tar.gz 将数据按照访问次数进行排序,求访问量前10的网站
scala实现代码如下:
package hw3import org.apache.spark._import scala.collection.immutable.HashSetimport org.apache.spark.rdd.RDD/** * @author BIGDATA */object spark_hw3{ var sc:SparkContext=null def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("MovieDemo") .setMaster("local") sc=new SparkContext(conf) //准备数据 val rating=sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\ratings.dat") .map(_.split("::")).map {x => (x(0),x(1),x(2))} //年龄段在“18-24”的男性年轻人,最喜欢看哪10部 top10LookeMovie //得分最高的10部电影 val topKScoreMostMovie = rating.map{x => (x._2, (x._3.toInt, 1)) }.reduceByKey { (v1, v2) => (v1._1 + v2._1, v1._2 + v2._2) }.map { x => (x._2._1.toDouble / x._2._2.toDouble, x._1) }.sortByKey(false). take(10). foreach(println) //女性看最多的10部电影 top10FaleLookMovie
//男性看最多的10部电影 top10MaleLookMovie
//看过电影最多的前10个人 val topKmostPerson = rating.map{ x => (x._1, 1) }.reduceByKey(_ + _). map(x =>(x._2, x._1)). sortByKey(false). take(10). foreach(println) val brower = sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\SogouQ2012.mini\\SogouQ.mini") val brs=brower.map(_.split("\t")).map { x => x(5) }.cache //访问量前10的网站 val topKBrower = brs.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) .sortBy(_._2, false) .take(10) .foreach(println) } /** * @param sc SparkContext对象 * @return 返回用户信息 */ def getUsers(sc:SparkContext):RDD[Array[String]]={ val scobj=sc val users=scobj.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\users.dat") .map(_.split("::")) users } /** * @param sc * @return 返回电影信息 */ def getMovies(sc:SparkContext):RDD[Array[String]]={ val scobj=sc val movies=scobj.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\movies.dat") .map(_.split("::")) movies } /** * * @param sc * @return 电影评分信息 */ def getRatings(sc:SparkContext):RDD[Array[String]]={ val scobj=sc val ratings=scobj.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\ratings.dat") .map(_.split("::")) ratings } def top10LookeMovie: Unit ={ //获取年龄段在“18-24”的男性年轻人的userid val users=getUsers(sc) val userList=users.filter(x=>x(1).equals("M") && x(2).toInt>=18 && x(2).toInt<=24) .map(x=>x(0)).collect() //注意:HashSet()后面要带小括号 val userSet=HashSet() ++ userList //创建广播变量 val broadcastUserSet=sc.broadcast(userSet) //统计出18-24岁男性喜欢看的前10名电影的movieid和次数 val ratings=getRatings(sc) val topNMovies=ratings.map(x=>(x(0),x(1))) //ratings中所有的(userid,movieid) //从rating数据过滤出“18-24”的男性年轻人的观影信息 .filter(x=>broadcastUserSet.value.contains(x._1)) .map(x=>(x._2,1)) .reduceByKey(_+_) //(movieid,次数) .sortBy(_._2,false) .take(10) //(movieid,次数) val movies=getMovies(sc) //获取所有电影的(movieid,title) val movieTitle=movies.map(x=>(x(0),x(1))).collect().toMap topNMovies.map(x=>(movieTitle.getOrElse(x._1,null),x._2)) .foreach(x=>println(x._1+" "+x._2)) } /** * 女性看过最多的10部电影 */ def top10FaleLookMovie: Unit ={ val users = getUsers(sc) //获取所有女性的userid val faleUserId = users.filter(x => x(1).equals("F")) .map(x => x(0)).collect() val faleUserSet = HashSet() ++ faleUserId //创建广播变量,里面存储所有女性的userid val broadcastFaleSet = sc.broadcast(faleUserSet) val ratings = getRatings(sc) //统计出女性看过最多的10部电影的(movieid,观看次数) val top10moiveid = ratings.map(x => (x(0), x(1))) //(userid,movieid) //过滤出女性观影数据 .filter(x => broadcastFaleSet.value.contains(x._1)) .map(x => (x._2, 1)) //(movieid,1) .reduceByKey(_ + _) .sortBy(_._2, false) .take(10) val top10movieRDD=sc.parallelize(top10moiveid) //(movieid,次数) val movies=getMovies(sc) val allmoviesRDD=movies.map(x=>(x(0),x(1))) //(movieid,title) //对两个RDD进行join操作,取二者的共同匹配项 allmoviesRDD.join(top10movieRDD) //(movieid,(title,次数)) .map(x=>(x._1,x._2._1,x._2._2)) .foreach(x=>println(x._1+" "+x._2+" "+x._3)) } /** * 男性看过最多的10部电影 */ def top10MaleLookMovie: Unit ={ val users = getUsers(sc) //获取所有男性的userid val faleUserId = users.filter(x => x(1).equals("M")) .map(x => x(0)).collect() val faleUserSet = HashSet() ++ faleUserId //创建广播变量,里面存储所有男性的userid val broadcastFaleSet = sc.broadcast(faleUserSet) val ratings = getRatings(sc) //统计出男性看过最多的10部电影的(movieid,观看次数) val top10moiveid = ratings.map(x => (x(0), x(1))) //(userid,movieid) //过滤出男性观影数据 .filter(x => broadcastFaleSet.value.contains(x._1)) .map(x => (x._2, 1)) //(movieid,1) .reduceByKey(_ + _) .sortBy(_._2, false) .take(10) val top10movieRDD=sc.parallelize(top10moiveid) //(movieid,次数) val movies=getMovies(sc) val allmoviesRDD=movies.map(x=>(x(0),x(1))) //(movieid,title) //对两个RDD进行join操作,取二者的共同匹配项 allmoviesRDD.join(top10movieRDD) //(movieid,(title,次数)) .map(x=>(x._1,x._2._1,x._2._2)) .foreach(x=>println(x._1+" "+x._2+" "+x._3)) }}