1 package com.xujunqi.source.com.bawei.api 2 3 import org.apache.flink.api.common.typeinfo.BasicTypeInfo 4 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat 5 import org.apache.flink.api.java.typeutils.RowTypeInfo 6 import org.apache.flink.api.scala._ 7 import org.apache.flink.types.Row 8 /* 9 sensor_1,1547718199,35.80 10 sensor_6,1547718201,15.40 11 sensor_7,1547718202,6.72 12 sensor_10,1547718205,38.10 13 sensor_18,1547718205,22.20 14 sensor_15,1547718205,25.32 15 MysqLSource 16 */ 17 object JdbcSourceTest { 18 def main(args: Array[String]): Unit = { 19 val env = ExecutionEnvironment.getExecutionEnvironment 20 21 val inputDataSet: DataSet[Row] = MyJDBCRead(env) 22 inputDataSet.map(r => (r.getField(0), r.getField(1))).print() 23 } 24 25 //通过jdbc读取MysqL数据 26 def MyJDBCRead(env: ExecutionEnvironment): DataSet[Row] = { 27 val InputMysqL = env.createInput(JDBCInputFormat.buildJDBCInputFormat() 28 .setDrivername("com.MysqL.jdbc.Driver") 29 .setDBUrl("jdbc:MysqL://hadoop102:3306/test") 30 .setUsername("root") 31 .setPassword("000000") 32 .setQuery("select sensor,temp from temperature") 33 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) 34 .finish()) 35 InputMysqL 36 37 } 38 39 }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。