博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark算子---实战应用
阅读量:5985 次
发布时间:2019-06-20

本文共 5644 字,大约阅读时间需要 18 分钟。

Spark算子实战应用

 

数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase

相关数据文件 :

users.dat ---UserID::Gender::Age::Occupation::Zip-code

movies.dat --- MovieID::Title::Genres

ratings.dat ---UserID::MovieID::Rating::Timestamp

SogouQ.mini

完成以下业务需求:

1. 年龄段在“18-24”的男性年轻人,最喜欢看哪10部

2.得分最高的10部电影;看过电影最多的前10个人;女性看多最多的10部电影;男性看过最多 的10部电影

3.利用数据集SogouQ2012.mini.tar.gz 将数据按照访问次数进行排序,求访问量前10的网站

 

scala实现代码如下:

package hw3import org.apache.spark._import scala.collection.immutable.HashSetimport org.apache.spark.rdd.RDD/** * @author BIGDATA */object spark_hw3{  var sc:SparkContext=null  def main(args: Array[String]): Unit = {    val conf=new SparkConf().setAppName("MovieDemo")              .setMaster("local")    sc=new SparkContext(conf)   //准备数据     val rating=sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\ratings.dat")              .map(_.split("::")).map {x => (x(0),x(1),x(2))}   //年龄段在“18-24”的男性年轻人,最喜欢看哪10部    top10LookeMovie   //得分最高的10部电影    val topKScoreMostMovie = rating.map{x =>      (x._2, (x._3.toInt, 1))    }.reduceByKey { (v1, v2) =>      (v1._1 + v2._1, v1._2 + v2._2)    }.map { x =>      (x._2._1.toDouble / x._2._2.toDouble, x._1)    }.sortByKey(false).        take(10).        foreach(println)       //女性看最多的10部电影    top10FaleLookMovie
  //男性看最多的10部电影     top10MaleLookMovie
  //看过电影最多的前10个人    val topKmostPerson = rating.map{ x =>      (x._1, 1)    }.reduceByKey(_ + _).        map(x =>(x._2, x._1)).        sortByKey(false).        take(10).        foreach(println)         val brower = sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\SogouQ2012.mini\\SogouQ.mini")     val brs=brower.map(_.split("\t")).map { x =>          x(5)    }.cache           //访问量前10的网站     val topKBrower = brs.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)    .sortBy(_._2, false)    .take(10)    .foreach(println)        }   /**    * @param sc SparkContext对象    * @return 返回用户信息    */  def getUsers(sc:SparkContext):RDD[Array[String]]={    val scobj=sc    val users=scobj.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\users.dat")                  .map(_.split("::"))    users  }   /**    * @param sc    * @return 返回电影信息    */  def getMovies(sc:SparkContext):RDD[Array[String]]={    val scobj=sc    val movies=scobj.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\movies.dat")                .map(_.split("::"))    movies  }   /**    *    * @param sc    * @return 电影评分信息    */  def getRatings(sc:SparkContext):RDD[Array[String]]={    val scobj=sc    val ratings=scobj.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\ratings.dat")              .map(_.split("::"))    ratings  }   def top10LookeMovie: Unit ={    //获取年龄段在“18-24”的男性年轻人的userid    val users=getUsers(sc)    val userList=users.filter(x=>x(1).equals("M") && x(2).toInt>=18 && x(2).toInt<=24)      .map(x=>x(0)).collect()    //注意:HashSet()后面要带小括号    val userSet=HashSet() ++ userList    //创建广播变量    val broadcastUserSet=sc.broadcast(userSet)    //统计出18-24岁男性喜欢看的前10名电影的movieid和次数    val ratings=getRatings(sc)    val topNMovies=ratings.map(x=>(x(0),x(1))) //ratings中所有的(userid,movieid)      //从rating数据过滤出“18-24”的男性年轻人的观影信息      .filter(x=>broadcastUserSet.value.contains(x._1))      .map(x=>(x._2,1))      .reduceByKey(_+_) //(movieid,次数)      .sortBy(_._2,false)      .take(10) //(movieid,次数)     val movies=getMovies(sc)    //获取所有电影的(movieid,title)    val movieTitle=movies.map(x=>(x(0),x(1))).collect().toMap    topNMovies.map(x=>(movieTitle.getOrElse(x._1,null),x._2))      .foreach(x=>println(x._1+"  "+x._2))  }        /**    * 女性看过最多的10部电影    */  def top10FaleLookMovie: Unit ={    val users = getUsers(sc)    //获取所有女性的userid    val faleUserId = users.filter(x => x(1).equals("F"))      .map(x => x(0)).collect()    val faleUserSet = HashSet() ++ faleUserId    //创建广播变量,里面存储所有女性的userid    val broadcastFaleSet = sc.broadcast(faleUserSet)     val ratings = getRatings(sc)    //统计出女性看过最多的10部电影的(movieid,观看次数)    val top10moiveid = ratings.map(x => (x(0), x(1))) //(userid,movieid)      //过滤出女性观影数据      .filter(x => broadcastFaleSet.value.contains(x._1))      .map(x => (x._2, 1)) //(movieid,1)      .reduceByKey(_ + _)      .sortBy(_._2, false)      .take(10)    val top10movieRDD=sc.parallelize(top10moiveid) //(movieid,次数)     val movies=getMovies(sc)    val allmoviesRDD=movies.map(x=>(x(0),x(1))) //(movieid,title)    //对两个RDD进行join操作,取二者的共同匹配项    allmoviesRDD.join(top10movieRDD) //(movieid,(title,次数))      .map(x=>(x._1,x._2._1,x._2._2))      .foreach(x=>println(x._1+"  "+x._2+"  "+x._3))  }    /**    * 男性看过最多的10部电影    */  def top10MaleLookMovie: Unit ={    val users = getUsers(sc)    //获取所有男性的userid    val faleUserId = users.filter(x => x(1).equals("M"))      .map(x => x(0)).collect()    val faleUserSet = HashSet() ++ faleUserId    //创建广播变量,里面存储所有男性的userid    val broadcastFaleSet = sc.broadcast(faleUserSet)     val ratings = getRatings(sc)    //统计出男性看过最多的10部电影的(movieid,观看次数)    val top10moiveid = ratings.map(x => (x(0), x(1))) //(userid,movieid)      //过滤出男性观影数据      .filter(x => broadcastFaleSet.value.contains(x._1))      .map(x => (x._2, 1)) //(movieid,1)      .reduceByKey(_ + _)      .sortBy(_._2, false)      .take(10)    val top10movieRDD=sc.parallelize(top10moiveid) //(movieid,次数)     val movies=getMovies(sc)    val allmoviesRDD=movies.map(x=>(x(0),x(1))) //(movieid,title)    //对两个RDD进行join操作,取二者的共同匹配项    allmoviesRDD.join(top10movieRDD) //(movieid,(title,次数))      .map(x=>(x._1,x._2._1,x._2._2))      .foreach(x=>println(x._1+"  "+x._2+"  "+x._3))  }}

  

转载地址:http://ueulx.baihongyu.com/

你可能感兴趣的文章
Oracle11gr2 Linux
查看>>
LeetCode – Refresh – Two Sum
查看>>
随手备忘 ubuntu12.04 lts 安装gcc 4.8
查看>>
Facebook力推导航库:React Navigation使用详解
查看>>
dispatch_source_create创建定时器和UIWindow创建类似处
查看>>
CCF NOI1028 判断互质
查看>>
PNG文件格式
查看>>
每一帧移动的距离
查看>>
实现定时备份mysql数据库并把备份数据库邮件发送
查看>>
KVM 记录
查看>>
Java的数组堆溢出问题
查看>>
mysql索引总结----mysql 索引类型以及创建
查看>>
社交化分享SDK for Unity
查看>>
7、Libgdx网络操作
查看>>
创建线程的方法 Thread Runnable
查看>>
微信公众平台开发(79) 每日宜忌
查看>>
Android多线程研究(9)——读写锁
查看>>
Android中的动画详解系列【4】——Activity之间切换动画
查看>>
简明Python3教程 18.下一步是什么
查看>>
Python 3 下载安装和环境搭建
查看>>