文章目录
保存为sequenceFile
package write import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkConf, SparkContext} object savetoSeq { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]") .setAppName("savetoSeq") val sc = new SparkContext(conf) val data = List(("name", "xiaoming"), ("age", "18")) val rddData = sc.parallelize(data, 1) rddData.saveAsSequenceFile("D:\\studyplace\\sparkBook\\chapter4\\result\\1",Some(classOf[GzipCodec])) } }
其中saveAsSequenceFile的api第一个参数是保存文件路径,第二个参数是设置压缩方式
对于ClassOf[xxxCodec]对象必须封装在Option集合中再传入SequenceFile方法中,在scala中Option的两个实例为Some集合和None集合,后者代表没有任何元素
在压缩方式中,GzipCodec的压缩比率较高,磁盘不足可以使用这个方式,虽然Bzip压缩率更高,但对于频繁读写场景不适用
保存到HDFS
saveAsTextFile
saveAsHadoopFile
对URI进行判断,以file:/// 将数据保存到本地文件系统中,如果schema是hdfs://将数据写到hdfs文件中
saveAsHadoopFile方法中,默认调用的是@R_502_4478@putFormat实现类作为输出数据的格式化工具
import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.output.@R_502_4478@putFormat import org.apache.spark.{SparkConf, SparkContext} object savetohadoop { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("savetohadoop").setMaster("local[*]") val sc = new SparkContext(conf) val rddData = sc.parallelize(List(("cat",20),("dog",29),("pig",11)),1) rddData.saveAsNewAPIHadoopFile("路径",classOf[Text],classOf[IntWritable],classOf[@R_502_4478@putFormat[Text,IntWritable]]) sc.stop() } }
保存到MysqL
package write import java.sql.DriverManager import org.apache.spark.{SparkConf, SparkContext} object savetoMysqL { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("savetoMysqL") val sc = new SparkContext(conf) Class.forName("com.MysqL.jdbc.Driver") val rddData = sc.parallelize(List(("tom",11),("jettty",19))) rddData.foreachPartition((iter:Iterator[(String,Int)]) => { val conn = DriverManager.getConnection("jdbc:MysqL://localhost:3306/spark?useUnicode=true&characterEncoding=utf-8","root","123456") conn.setAutoCommit(false) val statement = conn.prepareStatement("insert into spark.person (name,age) VALUES (?,?);") iter.foreach( t => { statement.setString(1,t._1) statement.setInt(2,t._2) statement.addBatch() }) statement.executeBatch() conn.commit() conn.close() }) sc.stop() } }
保存数据的时候使用foreachPartition方法遍历RDD的每一个分区
注意:
DriverManager.getConnection 需要移到foreaPartition内部
conn.setAutoCommit(false) 关闭自动提交,对于大数据量批量操作更合适
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。