微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark Streaming实时流处理项目6——Spark Streaming实战1

案例一:Spark Streaming处理socket数据

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @author YuZhansheng
  * @desc SparkStreaming处理socket数据
  * @create 2019-02-19 11:26
  */
object NetworkWordCount {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    //创建StreamingContext需要两个参数:SparkConf和batch interval
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val lines = ssc.socketTextStream("localhost",6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

测试:使用nc来测试 nc -lk 6789

发送数据,控制台打印出数据词频。

案例二:Spark Streaming处理文件系统数据(包括HDFS和本地文件系统)

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @author YuZhansheng
  * @desc Spark Streaming处理文件系统数据(包括HDFS和本地文件系统)
  * @create 2019-02-19 21:31
  */
object FileWordCount {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local").setAppName("FileWordCount")
        val ssc = new StreamingContext(sparkConf,Seconds(5))

        //监控/root/DataSet这个文件文件内容
        val lines = ssc.textFileStream("/root/DataSet")

        val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

        result.print()

        ssc.start()
        ssc.awaitTermination()
    }
}

往/root/DataSet文件夹下复制或者移动文本文件,观察控制台输出

案例三:使用Spark Streaming完成词频统计,并将结果写入到MysqL

小插曲:忘记了MysqLroot密码了,折腾了一会儿,才修改回来,记录一下参看这篇博客

步骤:

1.首先确认服务器出于安全的状态,也就是没有人能够任意地连接MysqL数据库。 
因为在重新设置MysqL的root密码的期间,MysqL数据库完全出于没有密码保护的 
状态下,其他的用户也可以任意地登录修改MysqL的信息。可以采用将MysqL对 
外的端口封闭,并且停止Apache以及所有的用户进程的方法实现服务器的准安全 
状态。最安全的状态是到服务器的Console上面操作,并且拔掉网线。

2.修改MysqL登录设置: 
# vim /etc/my.cnf 
在[MysqLd]的段中加上一句:skip-grant-tables 
例如: 
[MysqLd] 
datadir=/var/lib/MysqL 
socket=/var/lib/MysqL/MysqL.sock 
skip-grant-tables 
保存并且退出vi。

3.重新启动MysqL
# service MysqLd restart 
Stopping MysqL: [ OK ] 
Starting MysqL: [ OK ]

4.登录修改MysqL的root密码 
# MysqL 
Welcome to the MysqL monitor. Commands end with ; or \g. 
Your MysqL connection id is 3 to server version: 3.23.56 
Type 'help;' or '\h' for help. Type '\c' to clear the buffer. 
MysqL> USE MysqL
Database changed 
MysqL> UPDATE user SET Password = password ( 'new-password' ) WHERE User = 'root' ; 
Query OK, 0 rows affected (0.00 sec) 
Rows matched: 2 Changed: 0 Warnings: 0 
MysqL> flush privileges ; 
Query OK, 0 rows affected (0.01 sec) 
MysqL> quit

5.将MysqL登录设置修改回来 
# vim /etc/my.cnf 
将刚才在[MysqLd]的段中加上的skip-grant-tables删除 
保存并且退出vim

6.重新启动MysqL
# service MysqLd restart 
Stopping MysqL: [ OK ] 
Starting MysqL: [ OK ]

准备:先安装MysqL,启动MysqL服务service MysqLd start 或者 service MysqL start 登录MysqL客户端,创建数据库和表。

MysqL> create database spark; 
Query OK, 1 row affected (0.00 sec)

MysqL> use spark;
Database changed

MysqL> create table wordcount(
    -> word varchar(50) default null,
    -> wordcount int(10) default null
    -> );

项目中要用到MysqL驱动,先添加进pom文件

<!--MysqL驱动-->
<dependency>
  <groupId>MysqL</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.47</version>
</dependency>

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @ author YuZhansheng
  * @ desc 使用Spark Streaming完成词频统计,并将结果写入到MysqL中
  * @ create 2019-02-20 11:04
  */
object ForeachRDDApp {
    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")

        val ssc = new StreamingContext(sparkConf,Seconds(5))

        val lines = ssc.socketTextStream("localhost",6789)

        val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

        //result.print()  //此处仅仅是将结果统计输出在控制台

        //Todo 将结果写入到MysqL
        result.foreachRDD(rdd => {
            val connection = createConnection()
            rdd.foreach{ record =>
                val sql = "insert into wordcount(word, wordcount) values('"+record._1+"',"+record._2 +")"
                connection.createStatement().execute(sql)
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }

    //获取MysqL的连接
    def createConnection() = {
        Class.forName("com.MysqL.jdbc.Driver")
        DriverManager.getConnection("jdbc:MysqL://localhost:3306/spark","root","18739548870yu")
    }
}

运行上面代码会出现序列化异常:

19/02/20 11:27:18 ERROR JobScheduler: Error running job streaming job 1550633235000 ms.0
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
	at com.xidian.spark.ForeachRDDApp$$anonfun$main$1.apply(ForeachRDDApp.scala:30)
	at com.xidian.spark.ForeachRDDApp$$anonfun$main$1.apply(ForeachRDDApp.scala:28)

修改代码


import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @ author YuZhansheng
  * @ desc 使用Spark Streaming完成词频统计,并将结果写入到MysqL中
  * @ create 2019-02-20 11:04
  */
object ForeachRDDApp {
    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")

        val ssc = new StreamingContext(sparkConf,Seconds(5))

        val lines = ssc.socketTextStream("localhost",6789)

        val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

        //result.print()  //此处仅仅是将结果统计输出在控制台

        //Todo 将结果写入到MysqL
//        result.foreachRDD(rdd => {
//            val connection = createConnection()
//            rdd.foreach{ record =>
//                val sql = "insert into wordcount(word, wordcount) values('"+record._1+"',"+record._2 +")"
//                connection.createStatement().execute(sql)
//            }
//        })  会出现序列化异常

        result.foreachRDD(rdd => {
            rdd.foreachPartition(partitionOfRecords => {
                val connection = createConnection()
                partitionOfRecords.foreach(record => {
                    val sql = "insert into wordcount(word, wordcount) values('"+record._1+"',"+record._2 +")"
                    connection.createStatement().execute(sql)
                })
                connection.close()
            })
        })

        ssc.start()
        ssc.awaitTermination()
    }

    //获取MysqL的连接
    def createConnection() = {
        Class.forName("com.MysqL.jdbc.Driver")
        DriverManager.getConnection("jdbc:MysqL://localhost:3306/spark","root","18739548870yu")
    }
}

测试:nc -lk 6789
a d ff g h

MysqL表中:

MysqL> select * from wordcount;
+------+-----------+
| word | wordcount |
+------+-----------+
| d    |         1 |
| h    |         1 |
| ff   |         1 |
| a    |         1 |
| g    |         1 |
+------+-----------+
5 rows in set (0.00 sec)

此例子存在的问题:

1、对于已有的数据做更新,所有的数据均为insert;

改进思路:再插入之前先判断单词是否已经存在,如果存在就update,不存在则insert。但是在实际生产中,往往使用HBase或者Redis来存储。

2、每个rdd的partition都创建connection,建议改成连接池,提高效率。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐