package movies import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Movice { def main(args: Array[String]): Unit = { val cof = new SparkConf () .setAppName ( this.getClass.getSimpleName ) .setMaster ( "local[1]" ) val sc = new SparkContext ( cof ) val users1 = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\热门电影的数据\\users.dat" ) val movies1 = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\热门电影的数据\\movies.dat" ) val ratings1 = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\热门电影的数据\\ratings.dat" ) //1:评分(平均分)最高的10部电影 (moviceId, (userId, rating)) val ratings2: RDD[(Int, (String, Int))] = ratings1.map ( tp => { val splits: Array[String] = tp.split ( "::" ) val userId = splits ( 0 ) val moviceId = splits ( 1 ).toInt val rating = splits ( 2 ).toInt (moviceId, (userId, rating)) } ) //(moviceId, (userId, 1)) val rating4:RDD[(Int,(Int,Int))]=ratings2.map(tp=>{ val rating=tp._2._2 val moviceId=tp._1 (moviceId,(rating,1)) }) val group2: RDD[(Int, Iterable[(Int, Int)])] =rating4.groupByKey() //聚合(movid,rtingsum,counsum) val rantresult1: RDD[(Int, Int, Int)] = group2.map(tp=>{ val rantsum=tp._2.map(tp=>tp._1).sum val countsum=tp._2.map(_._2).sum (tp._1,rantsum,countsum) }) // //取平均值 // val ranresult2=rantresult1.map(tp=>{ // (tp._1,tp._2/tp._3) // }).sortBy(-_._2).take(10).foreach(println) //2:18 - 24 岁的男性年轻人 最喜欢看的10部电影 val users2: RDD[(Int, (String, Int))] = users1.map ( tp => { val splits: Array[String] = tp.split ( "::" ) val userId = splits ( 0 ).toInt val gender = splits ( 1 ) val age = splits ( 2 ).toInt (userId, (gender, age)) } ) val ratings3: RDD[(Int, String)] = ratings1.map ( tp => { val splits: Array[String] = tp.split ( "::" ) val userId = splits ( 0 ).toInt val moviceId = splits ( 1 ) (userId, moviceId) } ) // users2.join ( ratings3 ).filter ( tp => { // tp._2._1._1.equals ( "M" ) // tp._2._1._2 >= 18 && tp._2._1._2 <= 24 // } ).map ( tp => ( // tp._2._2, 1) // ).reduceByKey ( _ + _ ).sortBy ( -_._2 ).take ( 10 ).foreach ( println ) //3:女性观看次数最多的10部电影名称及观看次数 users2.join(ratings3).filter(tp=>{ tp._2._1._1.equals("F") }).map(tp=>( tp._2._2,1 )).reduceByKey(_+_).sortBy(-_._2).take(10).foreach(println) sc.stop() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。