案例一:Spark Streaming处理socket数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author YuZhansheng
* @desc SparkStreaming处理socket数据
* @create 2019-02-19 11:26
*/
object NetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
//创建StreamingContext需要两个参数:SparkConf和batch interval
val ssc = new StreamingContext(sparkConf,Seconds(5))
val lines = ssc.socketTextStream("localhost",6789)
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
测试:使用nc来测试 nc -lk 6789
发送数据,控制台打印出数据词频。
案例二:Spark Streaming处理文件系统数据(包括HDFS和本地文件系统)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author YuZhansheng
* @desc Spark Streaming处理文件系统数据(包括HDFS和本地文件系统)
* @create 2019-02-19 21:31
*/
object FileWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("FileWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
//监控/root/DataSet这个文件下文件内容
val lines = ssc.textFileStream("/root/DataSet")
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
往/root/DataSet文件夹下复制或者移动文本文件,观察控制台输出。
案例三:使用Spark Streaming完成词频统计,并将结果写入到MysqL中
小插曲:忘记了MysqLroot密码了,折腾了一会儿,才修改回来,记录一下。参看这篇博客
步骤:
1.首先确认服务器出于安全的状态,也就是没有人能够任意地连接MysqL数据库。
因为在重新设置MysqL的root密码的期间,MysqL数据库完全出于没有密码保护的
状态下,其他的用户也可以任意地登录和修改MysqL的信息。可以采用将MysqL对
外的端口封闭,并且停止Apache以及所有的用户进程的方法实现服务器的准安全
状态。最安全的状态是到服务器的Console上面操作,并且拔掉网线。
2.修改MysqL的登录设置:
# vim /etc/my.cnf
在[MysqLd]的段中加上一句:skip-grant-tables
例如:
[MysqLd]
datadir=/var/lib/MysqL
socket=/var/lib/MysqL/MysqL.sock
skip-grant-tables
保存并且退出vi。
3.重新启动MysqLd
# service MysqLd restart
Stopping MysqL: [ OK ]
Starting MysqL: [ OK ]
4.登录并修改MysqL的root密码
# MysqL
Welcome to the MysqL monitor. Commands end with ; or \g.
Your MysqL connection id is 3 to server version: 3.23.56
Type 'help;' or '\h' for help. Type '\c' to clear the buffer.
MysqL> USE MysqL ;
Database changed
MysqL> UPDATE user SET Password = password ( 'new-password' ) WHERE User = 'root' ;
Query OK, 0 rows affected (0.00 sec)
Rows matched: 2 Changed: 0 Warnings: 0
MysqL> flush privileges ;
Query OK, 0 rows affected (0.01 sec)
MysqL> quit
5.将MysqL的登录设置修改回来
# vim /etc/my.cnf
将刚才在[MysqLd]的段中加上的skip-grant-tables删除
保存并且退出vim
6.重新启动MysqLd
# service MysqLd restart
Stopping MysqL: [ OK ]
Starting MysqL: [ OK ]
准备:先安装MysqL,启动MysqL服务service MysqLd start 或者 service MysqL start 登录MysqL客户端,创建数据库和表。
MysqL> create database spark;
Query OK, 1 row affected (0.00 sec)
MysqL> use spark;
Database changed
MysqL> create table wordcount(
-> word varchar(50) default null,
-> wordcount int(10) default null
-> );
<!--MysqL驱动-->
<dependency>
<groupId>MysqL</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @ author YuZhansheng
* @ desc 使用Spark Streaming完成词频统计,并将结果写入到MysqL中
* @ create 2019-02-20 11:04
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val lines = ssc.socketTextStream("localhost",6789)
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//result.print() //此处仅仅是将结果统计输出在控制台
//Todo 将结果写入到MysqL
result.foreachRDD(rdd => {
val connection = createConnection()
rdd.foreach{ record =>
val sql = "insert into wordcount(word, wordcount) values('"+record._1+"',"+record._2 +")"
connection.createStatement().execute(sql)
}
})
ssc.start()
ssc.awaitTermination()
}
//获取MysqL的连接
def createConnection() = {
Class.forName("com.MysqL.jdbc.Driver")
DriverManager.getConnection("jdbc:MysqL://localhost:3306/spark","root","18739548870yu")
}
}
运行上面代码会出现序列化异常:
19/02/20 11:27:18 ERROR JobScheduler: Error running job streaming job 1550633235000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at com.xidian.spark.ForeachRDDApp$$anonfun$main$1.apply(ForeachRDDApp.scala:30)
at com.xidian.spark.ForeachRDDApp$$anonfun$main$1.apply(ForeachRDDApp.scala:28)
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @ author YuZhansheng
* @ desc 使用Spark Streaming完成词频统计,并将结果写入到MysqL中
* @ create 2019-02-20 11:04
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val lines = ssc.socketTextStream("localhost",6789)
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//result.print() //此处仅仅是将结果统计输出在控制台
//Todo 将结果写入到MysqL
// result.foreachRDD(rdd => {
// val connection = createConnection()
// rdd.foreach{ record =>
// val sql = "insert into wordcount(word, wordcount) values('"+record._1+"',"+record._2 +")"
// connection.createStatement().execute(sql)
// }
// }) 会出现序列化异常
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('"+record._1+"',"+record._2 +")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
//获取MysqL的连接
def createConnection() = {
Class.forName("com.MysqL.jdbc.Driver")
DriverManager.getConnection("jdbc:MysqL://localhost:3306/spark","root","18739548870yu")
}
}
测试:nc -lk 6789
a d ff g h
MysqL表中:
MysqL> select * from wordcount;
+------+-----------+
| word | wordcount |
+------+-----------+
| d | 1 |
| h | 1 |
| ff | 1 |
| a | 1 |
| g | 1 |
+------+-----------+
5 rows in set (0.00 sec)
此例子存在的问题:
1、对于已有的数据做更新,所有的数据均为insert;
改进思路:再插入之前先判断单词是否已经存在,如果存在就update,不存在则insert。但是在实际生产中,往往使用HBase或者Redis来存储。
2、每个rdd的partition都创建connection,建议改成连接池,提高效率。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。