微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink 题目

Flink 题目

MysqL中读取数据,通过Flink处理之后在存储到MysqL

package com.wt.flink.homework
import org.apache.flink.streaming.api.functions.source.sourceFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object Text1 {
  /**
   *
   * 1、从数据库MysqL中读取学生表的数据,
     2、统计班级的人数
     3、将统计好的结果保存到数据库中,一个班级只保存一条数据
   */
  def main(args: Array[String]): Unit = {
       //创建flink的环境
       val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

       //使用自定义的source读取MysqL中的数据
       val MysqLDS: DataStream[String] = env.addSource(new MysqLSource())

       val countDS: DataStream[(String, Int)] = MysqLDS
         .map(stu => (stu.split("\t")(4), 1))
         .keyBy(_._1)
         .sum(1)

       //将统计号的结果保存到MySlq中
       countDS.addSink(new MySlqSink)

       //触发任务执行
       env.execute()
  }

     /**
      * 自定义source ,实现SourceFunction接口
      *
      */

     class MysqLSource extends SourceFunction[String] {
          /**
           * run: 用于读取外部数据的方法,只执行一次
           *
           * @param ctx : 上下文对象,用于将读取到的数据发送到下游
           */
          override def run(ctx: SourceFunction.sourceContext[String]):Unit = {
               /**
                *  使用jdbc读取MysqL的数据,将读取到的数据发送到下游
                *
                */
               //创建连接
               Class.forName("com.MysqL.jdbc.Driver")
               val conn: Connection = DriverManager.getConnection("jdbc:MysqL://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8", "root", "123456")

               //编写查询的数据sql
               val stat: PreparedStatement = conn.prepareStatement("select * from students")

               //执行查询
               val result: ResultSet = stat.executeQuery()

               //解析数据
               while(result.next()){
                    val id: Long = result.getLong("id")
                    val name: String = result.getString("name")
                    val age: Long = result.getLong("age")
                    val gender: String = result.getString("gender")
                    val clazz: String = result.getString("clazz")


                    //将每一条数据发送到下游
                    ctx.collect(s"$id\t$name\t$age\t$gender\t$clazz")
               }

               //关闭连接
               stat.close()
               conn.close()

          }

          //任务被取消的时候执行,一般用于回收资源
          override def cancel(): Unit = {}

     }

     //写到MysqL中
     class MySlqSink extends RichSinkFunction[(String,Int)]{

          var con: Connection = _
          var stat: PreparedStatement = _


          /**
           * open:在invoke之前执行,每个task中只执行一次
           * 一般用于初始化数据库的连接
           *
           */
          override def open(parameters:Configuration):Unit ={
               //1、加载驱动
               Class.forName("com.MysqL.jdbc.Driver")
               //创建链接
               con = DriverManager.getConnection("jdbc:MysqL://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
               //编写插入数据的sql
               //replace :如果不存在插入,如果存在就替换,需要在表中设置主键
               stat = con.prepareStatement("replace into clazz_num(clazz,num) values(?,?)")
          }

          /**
           * 任务关闭后执行,一般用于回收资源
           */
          override def close():Unit={
               //关闭连接
               stat.close()
               con.close()
          }

          /**
           * 每一条数据会执行一次
           * 使用jdbc将数据保存到MysqL中
           *
           * @param kv      : 一行数据
           * @param context : 上下文对象
           */
          override def invoke(kv: (String, Int), context: SinkFunction.Context): Unit = {
               //设置参数
               stat.setString(1, kv._1)
               stat.setInt(2, kv._2)
               //执行插入
               stat.execute()
          }
     }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐