微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink-table demo

添加依赖

  • maven pom 如下

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.irisz</groupId>
        <artifactId>flink-test1</artifactId>
        <version>0.1-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <scala.binary.version>2.11</scala.binary.version>
            <flink.version>1.12.1</flink.version>
            <hadoop.version>2.7.5</hadoop.version>
        </properties>
    
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>MysqL</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
            </dependency>
        </dependencies>
    
    
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.5.1</version>
                    <configuration>
                        <source>${maven.compiler.source}</source>
                        <target>${maven.compiler.target}</target>
                        <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <!--<arg>-make:transitive</arg>-->
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
    
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <!--
                                            zip -d learn_spark.jar meta-inf/*.RSA meta-inf/*.DSA meta-inf/*.SF
                                            -->
                                            <exclude>meta-inf/*.SF</exclude>
                                            <exclude>meta-inf/*.DSA</exclude>
                                            <exclude>meta-inf/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.itheima.batch.BatchFromCollection</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
    
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.6</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                                <mainClass>com.itheima.env.BatchRemoteEven</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <version>2.10</version>
                    <executions>
                        <execution>
                            <id>copy-dependencies</id>
                            <phase>package</phase>
                            <goals>
                                <goal>copy-dependencies</goal>
                            </goals>
                            <configuration>
                                <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

测试代码

package cn.irisz.connect_test

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, TableResult}

object Demo2 {
  def main(args: Array[String]): Unit = {
    //    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
    //    val tEnv: TableEnvironment = StreamTable.create(settings)
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(environment, settings)
    val tData: TableResult = tEnv.executesql(
      """CREATE TABLE `log` (
          `ip` STRING,
          `i_country` STRING,
          `i_province` STRING,
          `i_city` STRING,
          `i_isp` STRING,
          `url_path` STRING,
          `url_count` BIGINT
        ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:MysqL://localhost:3306/test1?useSSL=false',
          'table-name' = 'result',
          'username' = 'root',
          'password' = '123456'
        )""")
    val result: TableResult = tEnv.executesql(
      """
      CREATE TABLE `sink` (
      `ip` STRING,
      `i_country` STRING,
      `i_province` STRING,
      `i_city` STRING,
      `i_isp` STRING,
      `url_path` STRING,
      `url_count` BIGINT
    ) WITH (
      'connector' = 'print'
    )""")
    tEnv.executesql("""INSERT INTO sink SELECT * FROM log LIMIT 100""")
    tEnv.execute("demo 2")
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐