几天无奈之下只能暂时放弃第五个实验的第2题,开始做第三题,经过踉踉跄跄以后,对源程序进行打包时出错。
第三题实验内容:
3. 编程实现利用 DataFrame 读写 MysqL 的数据 (1)在 MysqL 数据库中新建数据库 sparktest,再创建表 employee,包含下面两行数据。
(2)配置 Spark 通过 JDBC 连接数据库 MysqL,编程实现利用 DataFrame 插入如表 6-3 所 示的两行数据到 MysqL 中,最后打印出 age 的最大值和 age 的总和。 源代码: (1)启动并登陆MysqL后执行如下命令:
1 create database sparktest; 2 use sparktest; 3 create table employee(id int(4),name char(20),gender char(4),age int(4)); 4 insert into employee values(1,'Alice','F',22); 5 insert into employee values(2,'John','M',25);View Code
(2)
1 import java.util.Properties 2 import org.apache.spark.sql.types._ 3 import org.apache.spark.sql.Row 4 object TestMysqL 5 { 6 def main(args:Array[String]) 7 { 8 val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" ")) 9 val schema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true))) 10 val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt)) 11 val employeeDF=spark.createDataFrame(rowRDD,schema) 12 val prop=new Properties() 13 prop.put("user","root") 14 prop.put("password","yr123456") 15 prop.put("driver","com.MysqL.jdbc.Driver") 16 employeeDF.write.mode("append").jdbc("jdbc:MysqL://localhost:3306/sparktest","sparktest.employee",prop) 17 val jdbcDF=spark.read.format("jdbc").option("driver","com.MysqL.jdbc.Driver").option("dbtable","employee").option("user","root").option("password","yr123456").load() 18 jdbcDF.agg("age"->"max","age"->"sum") 19 } 20 }View Code
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。