微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

编程实现利用 DataFrame 读写 MySQL 的数据

package scala

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.sqlContext
object TestMysqL {
 def main(args: Array[String]) {
    val conf = new SparkConf()
        conf.setMaster("local")  
            .setAppName("scalawordcount")    //设置运行方式为本地     
    val sc = new SparkContext(conf)  
    var sqlContext = new sqlContext(sc) 
    val employeeRDD = sqlContext.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
    val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
    val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt))
    val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
    val prop = new Properties()
prop.put("user", "root") 
prop.put("password", "root") 
prop.put("driver","com.MysqL.jdbc.Driver")
prop.put("url","jdbc:MysqL://localhost:3306/sparktest")
employeeDF.write.mode("append").jdbc("jdbc:MysqL://localhost:3306/sparktest", "sparktest.employee", prop)
val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:MysqL://localhost:3306/sparktest").option("driver","com.MysqL.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "root").load()
jdbcDF.agg("age" -> "max", "age" -> "sum")
jdbcDF.show()

 }
}

代码为scala语言所写,经调试可用。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐