微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark热门电影

package movies

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Movice {
  def main(args: Array[String]): Unit = {
    val cof = new SparkConf ()
      .setAppName ( this.getClass.getSimpleName )
      .setMaster ( "local[1]" )
    val sc = new SparkContext ( cof )

    val users1 = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\热门电影的数据\\users.dat" )
    val movies1 = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\热门电影的数据\\movies.dat" )
    val ratings1 = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\热门电影的数据\\ratings.dat" )

    //1:评分(平均分)最高的10部电影 (moviceId, (userId, rating))
    val ratings2: RDD[(Int, (String, Int))] = ratings1.map ( tp => {
      val splits: Array[String] = tp.split ( "::" )
      val userId = splits ( 0 )
      val moviceId = splits ( 1 ).toInt
      val rating = splits ( 2 ).toInt
      (moviceId, (userId, rating))
    } )

    //(moviceId, (userId, 1))
    val rating4:RDD[(Int,(Int,Int))]=ratings2.map(tp=>{
      val rating=tp._2._2
      val moviceId=tp._1
      (moviceId,(rating,1))
    })

    val group2: RDD[(Int, Iterable[(Int, Int)])] =rating4.groupByKey()

    //聚合(movid,rtingsum,counsum)
    val rantresult1: RDD[(Int, Int, Int)] = group2.map(tp=>{
      val rantsum=tp._2.map(tp=>tp._1).sum
      val countsum=tp._2.map(_._2).sum
      (tp._1,rantsum,countsum)
    })
//    //取平均值
//    val ranresult2=rantresult1.map(tp=>{
//      (tp._1,tp._2/tp._3)
//    }).sortBy(-_._2).take(10).foreach(println)

    //2:18 - 24 岁的男性年轻人 最喜欢看的10部电影
    val users2: RDD[(Int, (String, Int))] = users1.map ( tp => {
      val splits: Array[String] = tp.split ( "::" )
      val userId = splits ( 0 ).toInt
      val gender = splits ( 1 )
      val age = splits ( 2 ).toInt

      (userId, (gender, age))
    } )

    val ratings3: RDD[(Int, String)] = ratings1.map ( tp => {
      val splits: Array[String] = tp.split ( "::" )
      val userId = splits ( 0 ).toInt
      val moviceId = splits ( 1 )
      (userId, moviceId)
    } )


//     users2.join ( ratings3 ).filter ( tp => {
//      tp._2._1._1.equals ( "M" )
//      tp._2._1._2 >= 18 && tp._2._1._2 <= 24
//    } ).map ( tp => (
//      tp._2._2, 1)
//    ).reduceByKey ( _ + _ ).sortBy ( -_._2 ).take ( 10 ).foreach ( println )

//3:女性观看次数最多的10部电影名称及观看次数
  users2.join(ratings3).filter(tp=>{
    tp._2._1._1.equals("F")
  }).map(tp=>(
    tp._2._2,1
  )).reduceByKey(_+_).sortBy(-_._2).take(10).foreach(println)

  sc.stop()
}
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐