微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

JdbcSourceTest 查出mysql的数据

 1 package com.xujunqi.source.com.bawei.api
 2 
 3 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 4 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
 5 import org.apache.flink.api.java.typeutils.RowTypeInfo
 6 import org.apache.flink.api.scala._
 7 import org.apache.flink.types.Row
 8 /*
 9 sensor_1,1547718199,35.80
10 sensor_6,1547718201,15.40
11 sensor_7,1547718202,6.72
12 sensor_10,1547718205,38.10
13 sensor_18,1547718205,22.20
14 sensor_15,1547718205,25.32
15   MysqLSource
16  */
17 object JdbcSourceTest {
18   def main(args: Array[String]): Unit = {
19     val env = ExecutionEnvironment.getExecutionEnvironment
20 
21     val inputDataSet: DataSet[Row] = MyJDBCRead(env)
22     inputDataSet.map(r => (r.getField(0), r.getField(1))).print()
23   }
24 
25   //通过jdbc读取MysqL数据
26   def MyJDBCRead(env: ExecutionEnvironment): DataSet[Row] = {
27     val InputMysqL = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
28       .setDrivername("com.MysqL.jdbc.Driver")
29       .setDBUrl("jdbc:MysqL://hadoop102:3306/test")
30       .setUsername("root")
31       .setPassword("000000")
32       .setQuery("select  sensor,temp from temperature")
33       .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO))
34       .finish())
35     InputMysqL
36 
37   }
38 
39 }

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐