微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

[官方Flink入门笔记 ] 四、客户端操作

一 .环境说明

在前面几期的课程里面讲过了 Flink 开发环境的搭建和应用的部署以及运行,今天的课程主要是讲 Flink 的客户端操作。

本次讲解以实际操作为主。这次课程是基于社区的 Flink 1.12.1 版本,操作系统是 Mac 系统,浏览器是 Google Chrome 浏览器。

二 .概要

Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,sql Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 sql Client 用于提交 sql 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过 http 方式进行调用。此外,还有 Web 的方式可以提交任务。

在这里插入图片描述

在 Flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。

在这里插入图片描述

三 .Flink 命令行

Flink 的命令行参数很多,输入 flink – h 能看到完整的说明:

./bin/flink -h

如果想看某一个命令的参数,比如 run 命令,输入:

flink-1.7.2 bin/flink run -h

官方文档

在这里插入图片描述

[root@xxx flink-1.12.1]# ./bin/flink -h
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point("main()" method). Only needed if the
                                          JAR file does not specify the class inits manifest.
     -C,--classpath <url>                 Adds a URL to each user code  classloader  on all nodes in the cluster. 
                                          The paths must specify a  protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). 
                                          You can use this option multiple times for specifying more than one URL. 
                                          The protocol must be supported by the {@link java.net.urlclassloader}.
                                          
     -d,--detached                        If present, runs the job in detached mode 
     
     -n,--allowNonRestoredState           Allow to skip savepoint state that cannot be restored. 
                                          You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered.
                                          
     -p,--parallelism <parallelism>       The parallelism with which to run the program. 
                                          Optional flag to override the default value specified in the configuration.
                                          
     -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).
     
     -SAE,--shutdownOnAttachedExit        If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated  abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.
                     
                                          
  Options for Generic CLI mode:
  
     -D <property=value>   Allows specifying multiple generic configuration  options. 
                           The available options can be found at  https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
                           
     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is  also available with the "Application Mode".
                           
                           The name of the executor to be used for executing the  given job, which is equivalent to the "execution.target" config option. The currently  available executors are: "remote", "local",  "kubernetes-session", "yarn-per-job", "yarn-session".
                           
     -t,--target <arg>     The deployment target for the given application,  which is equivalent to the "execution.target" config option. 
     
     					   For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". 
     					   For the "run-application" action the currently available  targets are: "kubernetes-application",  "yarn-application".


  Options for yarn-cluster mode:
     -d,--detached                        If present, runs the job in detached  mode
     -m,--jobmanager <arg>                Set to yarn-cluster to use YARN  execution mode.
     -yat,--yarnapplicationType <arg>     Set a custom application type for the  application on YARN
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached  mode (deprecated; use non-YARN specific option instead)
     
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     
     -yj,--yarnjar <arg>                  Path to Flink jar file
     
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
                                          
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN  application
     
     -ynm,--yarnname <arg>                Set a custom name for the application  on YARN
     
     -yq,--yarnquery                      display available YARN resources  (memory, cores)
     
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     
     -yt,--yarnship <arg>                 Ship files in the specified directory (t for transfer)
     
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
                                          
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper sub-paths for high availability mode
                                          

  Options for default mode:
     -D <property=value>             Allows specifying multiple generic configuration options. 
                                     The available options can be found at  https://ci.apache.org/projects/flink/flink-  docs-stable/ops/config.html
                                     
     -m,--jobmanager <arg>           Address of the JobManager to which to connect.
                                     Use this flag to connect to a different JobManager than the one specified in the configuration. 
                                     Attention: This option is respected only if the  high-availability configuration is NONE.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths  for high availability mode



Action "run-application" runs an application in Application Mode.

  Syntax: run-application [OPTIONS] <jar-file> <arguments>
  Options for Generic CLI mode:
     -D <property=value>   Allows specifying multiple generic configuration options. 
                           The available options can be found at  https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html

     -t,--target <arg>     The deployment target for the given application,  which is equivalent to the "execution.target" config option. 
                           For the "run" action the currently available targets are: "remote", "local", "kubernetes-session",  "yarn-per-job", "yarn-session". 
                           For the   "run-application" action the currently available   targets are: "kubernetes-application",  "yarn-application".



Action "info" shows the optimized execution plan of the program (JSON).

  Syntax: info [OPTIONS] <jar-file> <arguments>
  "info" action options:
     -c,--class <classname>           Class with the program entry point ("main()" method). 
                                      Only needed if the JAR file does not specify the class in its manifest.
                                      
     -p,--parallelism <parallelism>   The parallelism with which to run the program. 
                                      Optional flag to override the default value specified in the  configuration.


Action "list" lists running and scheduled programs.

  Syntax: list [OPTIONS]
  "list" action options:
     -a,--all         Show all programs and their JobIDs
     -r,--running     Show only running programs and their JobIDs
     -s,--scheduled   Show only scheduled programs and their JobIDs
     
  Options for Generic CLI mode:
     -D <property=value>   Allows specifying multiple generic configuration options.
                           The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
                           
     -t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config option. 
                           For the "run" action the currently available  targets are: "remote", "local", "kubernetes-session",  "yarn-per-job", "yarn-session". 
                           For the  "run-application" action the currently available   targets are: "kubernetes-application", "yarn-application".

  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution  mode.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper    sub-paths for high availability mode

  Options for default mode:
     -D <property=value>             Allows specifying multiple generic  configuration options. 
                                     The available  options can be found at  https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
                                     
     -m,--jobmanager <arg>           Address of the JobManager to which to connect. 
                                     Use this flag to connect to a different JobManager than the one specified
                                     in the configuration. 
                                     Attention: This  option is respected only if the high-availability configuration is NONE.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode



Action "stop" stops a running program with a savepoint (streaming jobs only).

  Syntax: stop [OPTIONS] <Job ID>
  "stop" action options:
     -d,--drain                           Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.
                                          
     -p,--savepointPath <savepointPath>   Path to the savepoint (for example hdfs:///flink/savepoint-1537).
                                           If no  directory is specified, the configured  default will be used ("state.savepoints.dir").
                                         
  Options for Generic CLI mode:
     -D <property=value>   Allows specifying multiple generic configuration options. 
                           The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
     -t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config option. 
                           For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". 
                           For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application".

  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution mode.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper sub-paths for high availability mode

  Options for default mode:
     -D <property=value>             Allows specifying multiple generic  configuration options. 
     								 The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html
     								 
     -m,--jobmanager <arg>           Address of the JobManager to which to connect.
                                     Use this flag to connect to a different JobManager than the one specified in the configuration. 
                                     Attention: This  option is respected only if the  high-availability configuration is NONE.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths  for high availability mode



Action "cancel" cancels a running program.

  Syntax: cancel [OPTIONS] <Job ID>
  "cancel" action options:
     -s,--withSavepoint <targetDirectory>   **DEPRECATION WARNING**: Cancelling a job with savepoint is deprecated.
                                            Use "stop" instead.
                                            Trigger savepoint and cancel job. 
                                            The target directory is optional. 
                                            If  no directory is specified, the  configured default directory    (state.savepoints.dir) is used.
                                            
  Options for Generic CLI mode:
     -D <property=value>   Allows specifying multiple generic configuration options. 
                           The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
                           
     -t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config  option. 
     					   For the "run" action the currently available  targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". 
     					   For the  "run-application" action the currently available  targets are: "kubernetes-application",  "yarn-application".

  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution mode.
                                     
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper sub-paths for high availability mode

  Options for default mode:
     -D <property=value>             Allows specifying multiple generic configuration options. 
                                     The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
                                     
     -m,--jobmanager <arg>           Address of the JobManager to which to connect. 
                                     Use this flag to connect to a different JobManager than the one specified
                                     in the configuration. 
                                     Attention: This  option is respected only if the  high-availability configuration is NONE.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode



Action "savepoint" triggers savepoints for a running job or disposes existing ones.

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  "savepoint" action options:
     -d,--dispose <arg>       Path of savepoint to dispose.
     -j,--jarfile <jarfile>   Flink program JAR file.
  Options for Generic CLI mode:
     -D <property=value>   Allows specifying multiple generic configuration options. 
                           The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
 
     -t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" config option. 
                           For the "run" action the currently available targets are: "remote", "local", "kubernetes-session",  "yarn-per-job", "yarn-session". 
                           For the  "run-application" action the currently available  targets are: "kubernetes-application", "yarn-application".

  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution mode.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper sub-paths for high availability mode

  Options for default mode:
     -D <property=value>             Allows specifying multiple generic configuration options. 
                                     The available options can be found at  https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
     -m,--jobmanager <arg>           Address of the JobManager to which to connect. 
                                     Use this flag to connect to a   different JobManager than the one specified in the configuration. 
                                     Attention: This option is respected only if the high-availability configuration is NONE.
                                     
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode



3.1. Standalone

  • 首先启动一个 Standalone 的集群:

BoYi-Pro:flink-1.12.1 sysadmin$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host BoYi-Pro.lan.
Starting taskexecutor daemon on host BoYi-Pro.lan.

  • 打开 http://127.0.0.1:8081 能看到 Web 界面。

在这里插入图片描述

3.1.1. Run

运行任务,以 Flink 自带的例子 TopSpeedWindowing 为例:

BoYi-Pro:flink-1.12.1 sysadmin$ bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 02fa782293dc0c01baf9b4902236b4dd
  • 运行起来后认是 1 个并发:

    在这里插入图片描述

  • 点左侧「Task Manager」,然后点「Stdout」能看到输出日志:

    在这里插入图片描述


    或者查看本地 Log 目录下的 *.out 文件

tail -f flink-sysadmin-taskexecutor-0-BoYi-Pro.lan.out

在这里插入图片描述

3.1.2. List

查看任务列表:

BoYi-Pro:log sysadmin$ flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.02.2021 01:09:08 : 02fa782293dc0c01baf9b4902236b4dd : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
BoYi-Pro:log sysadmin$

3.1.3.Stop

停止任务。通过 -m 来指定要停止的 JobManager 的主机地址和端口。

  • 如果想用stop必须实现StoppableFunction接口,否则报错.
BoYi-Pro:log sysadmin$ flink stop -m 127.0.0.1:8081  02fa782293dc0c01baf9b4902236b4dd
Suspending job "02fa782293dc0c01baf9b4902236b4dd" with a savepoint.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job "02fa782293dc0c01baf9b4902236b4dd".
	at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:585)
	at org.apache.flink.client.cli.CliFrontend.runclusteraction(CliFrontend.java:1006)
	at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:573)
	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1073)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided.

从日志里面能看出 Stop 命令执行失败了。一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了 StoppableFunction 接口。

/**
 * 需要能 stoppable 的函数必须实现这个接口,例如流式任务的 source。
 * stop() 方法在任务收到 STOP 信号的时候调用。
 * source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。
 */
@PublicEvolving
public interface StoppableFunction {
    /**
      * 停止 source。与 cancel() 不同的是,这是一个让 source 优雅停止的请求。
     * 等待中的数据可以继续发送出去,不需要立即停止。
      */
    void stop();
}

3.1.4.Cancel

取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存 Savepoint,否则不会保存 Savepoint。

BoYi-Pro:log sysadmin$ flink cancel -m 127.0.0.1:8081 02fa782293dc0c01baf9b4902236b4dd
Cancelling job 02fa782293dc0c01baf9b4902236b4dd.
Cancelled job 02fa782293dc0c01baf9b4902236b4dd.

也可以在停止的时候显示指定 Savepoint 目录。

BoYi-Pro:flink-1.12.1 sysadmin$ flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint e145eea8f0ff0a696af5fefa0a259846
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e145eea8f0ff0a696af5fefa0a259846 with savepoint to /tmp/savepoint.
Cancelled job e145eea8f0ff0a696af5fefa0a259846. Savepoint stored in file:/tmp/savepoint/savepoint-e145ee-061f7da6c990.
BoYi-Pro:flink-1.12.1 sysadmin$ ls /tmp/savepoint
savepoint-e145ee-061f7da6c990
  • 取消和停止(流作业)的区别如下:

cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。

stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。

3.1.5.Savepoint

触发 Savepoint。

BoYi-Pro:flink-1.12.1 sysadmin$ bin/flink savepoint -m 127.0.0.1:8081 e6665485734cf8e91d0a924f07eeb302 /tmp/savepoint
Triggering savepoint for job e6665485734cf8e91d0a924f07eeb302.
Waiting for response...
Savepoint completed. Path: file:/tmp/savepoint/savepoint-e66654-85b071592103
You can resume your program from this savepoint with the run command.
BoYi-Pro:flink-1.12.1 sysadmin$ ls -l /tmp/savepoint
total 0
drwxr-xr-x  3 sysadmin  wheel  96  2 16 02:09 savepoint-e66654-85b071592103
BoYi-Pro:flink-1.12.1 sysadmin$

说明:Savepoint 和 Checkpoint 的区别(详见文档):

  • Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。

  • Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。

  • 通过 -s 参数从指定的 Savepoint 启动:

BoYi-Pro:flink-1.12.1 sysadmin$ flink run -d -s /tmp/savepoint/savepoint-e66654-ca80b929a31a ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 45c617bcd9e20f99ccc80dc81f94370b

查看 JobManager 的日志,能够看到类似这样的 Log:

在这里插入图片描述

3.1.6.Modify

修改任务并行度。 截止到当前1.12.1版本尚不支持modify命令, 可以通过配置文件修改认并行度.
如果想修改并行度,可以先停止使用cancel + savepoint 停止任务, 然后在根据savepoint重新启动任务的时候设置并行度.

conf/flink-conf.yaml 将 Task Slot 数从认的 1 改为 4,并配置 Savepoint 目录.

taskmanager.numberOfTaskSlots: 4
state.savepoints.dir: file:///tmp/savepoint

启动命令: bin/start-cluster.sh
停止命令: bin/stop-cluster.sh

修改参数后需要重启集群生效,然后再启动任务:

bin/stop-cluster.sh && bin/start-cluster.sh
bin/flink run -d examples/streaming/TopSpeedWindowing.jar

页面上能看到 Task Slot 变为了 4,这时候任务的认并发度是 1。

在这里插入图片描述


在这里插入图片描述

3.1.7.Info

Info 命令是用来查看 Flink 任务的执行计划(StreamGraph)的。

BoYi-Pro:flink-1.12.1 sysadmin$ flink info examples/streaming/TopSpeedWindowing.jar
----------------------- Execution Plan -----------------------
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Timestamps/Watermarks",
    "pact" : "Operator",
    "contents" : "Timestamps/Watermarks",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "Window(GlobalWindows(), DeltaTrigger, Timeevictor, ComparableAggregator, PassthroughWindowFunction)",
    "pact" : "Operator",
    "contents" : "Window(GlobalWindows(), DeltaTrigger, Timeevictor, ComparableAggregator, PassthroughWindowFunction)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "Sink: Print to Std. Out",
    "pact" : "Data Sink",
    "contents" : "Sink: Print to Std. Out",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
--------------------------------------------------------------

No description provided.

拷贝输出的 Json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/

在这里插入图片描述


点击Draw

在这里插入图片描述


可以和实际运行的物理执行计划对比:

在这里插入图片描述

3.2. Yarn per-job

3.2.1. 单任务 Attach 模式

认是 Attach 模式,即客户端会一直等待直到程序结束才会退出

  • 通过 -m yarn-cluster 指定 Yarn 模式

  • Yarn 上显示名字为 Flink session cluster,这个 Batch 的 Wordcount 任务运行完会 FINISHED。

  • 客户端能看到结果输出

在这里插入图片描述

[root@master01 flink-1.12.1]#  ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 15:54:42,222 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 15:54:42,397 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 15:54:42,653 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 15:54:42,782 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 15:54:42,801 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 15:54:42,960 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 15:54:43,029 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 15:54:43,030 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-02-16 15:54:43,030 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-02-16 15:54:43,467 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 15:54:48,109 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 15:54:48,152 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0002
2021-02-16 15:54:48,396 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0002
2021-02-16 15:54:48,397 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 15:54:48,401 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - deploying cluster, current state ACCEPTED
2021-02-16 15:54:53,460 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 15:54:53,461 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-030:33895 of application 'application_1613318015145_0002'.
Job has been submitted with JobID c0fb5c95eff1a1d6447a2b5e975b6d5f
Program execution finished
Job with JobID c0fb5c95eff1a1d6447a2b5e975b6d5f has finished.
Job Runtime: 13486 ms
Accumulator Results:
- 9ddb483b217c1caa5e24b9eccbe384a5 (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)

如果我们以 attach 模式运行 streaming 的任务,客户端会一直等待不退出,可以运行以下的
例子试验下:

./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar

在这里插入图片描述

3.2.2. 单任务detached

● 由于是 detached 模式,客户端提交完任务就退出
● Yarn 上显示为 Flink per-job cluster

在这里插入图片描述


在这里插入图片描述

[root@master01 flink-1.12.1]# ./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:00:33,224 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:00:33,410 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:00:33,670 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 17:00:33,806 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:00:33,819 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:00:33,974 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 17:00:34,039 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 17:00:34,040 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-02-16 17:00:34,040 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-02-16 17:00:34,464 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 17:00:39,197 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:00:39,242 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0003
2021-02-16 17:00:39,483 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0003
2021-02-16 17:00:39,484 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 17:00:39,487 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - deploying cluster, current state ACCEPTED
2021-02-16 17:00:46,077 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 17:00:46,079 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-030:42671 of application 'application_1613318015145_0003'.
Job has been submitted with JobID 66d59ef65f870013dd8f6f195efa6258

[root@master01 flink-1.12.1]# ./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:04:23,375 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:04:23,551 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:04:23,801 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 17:04:23,927 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:04:23,938 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:04:24,096 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 17:04:24,159 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 17:04:24,159 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-02-16 17:04:24,159 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-02-16 17:04:24,579 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 17:04:29,160 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:04:29,202 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0004
2021-02-16 17:04:29,443 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0004
2021-02-16 17:04:29,443 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 17:04:29,447 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - deploying cluster, current state ACCEPTED
2021-02-16 17:04:36,029 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 17:04:36,031 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1613318015145_0004
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1613318015145_0004
Note that killing Flink might not clean up all job artifacts and temporary files.
2021-02-16 17:04:36,032 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-030:39693 of application 'application_1613318015145_0004'.
Job has been submitted with JobID 81b5ad3509dea923e686a0a4a962f3b1

3.3. yarn session

./bin/yarn-session.sh -tm 2048 -s 3

表示启动一个 yarn session 集群,每个TM的内存是2G,每个TM有3个slot。

[root@master01 flink-1.12.1]#
[root@master01 flink-1.12.1]# ./bin/yarn-session.sh -tm 2048 -s 3
2021-02-16 17:11:16,097 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2021-02-16 17:11:16,103 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-02-16 17:11:16,340 WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-02-16 17:11:16,401 INFO  org.apache.flink.runtime.security.modules.HadoopModule       [] - Hadoop user set to hdfs (auth:SIMPLE)
2021-02-16 17:11:16,414 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-6503414141525855669.conf.
2021-02-16 17:11:16,455 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:11:16,682 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:11:17,018 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 17:11:17,174 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:11:17,216 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-02-16 17:11:17,371 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 17:11:17,435 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 17:11:17,436 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=2048, slotsPerTaskManager=3}
2021-02-16 17:11:17,886 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 17:11:22,799 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-02-16 17:11:22,818 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:11:22,858 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0006
2021-02-16 17:11:23,099 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0006
2021-02-16 17:11:23,099 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 17:11:23,102 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - deploying cluster, current state ACCEPTED
2021-02-16 17:11:28,156 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 17:11:28,158 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface salve02:32875 of application 'application_1613318015145_0006'.
JobManager Web Interface: http://salve02:32875

  • 客户端认是attach模式,不会退出

  • Yarn上显示为 Flink session cluster

在这里插入图片描述


在这里插入图片描述

[root@master01 tmp]# cat /tmp/.yarn-properties-root
#Generated YARN properties file
#Tue Feb 16 17:11:28 CST 2021
dynamicPropertiesstring=
applicationID=application_1613318015145_0006
  • 提交任务

./bin/flink run ./examples/batch/WordCount.jar

将会根据 /tmp/.yarn-properties-admin 文件内容提交到了刚启动的 session

在这里插入图片描述

[root@henghe-023 flink-1.12.1]# ./bin/flink run ./examples/batch/WordCount.jar
2021-02-16 17:22:20,526 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-02-16 17:22:20,526 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:22:21,029 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:22:21,214 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:22:21,470 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 17:22:21,594 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:22:21,606 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:22:21,703 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
Job has been submitted with JobID b0a19e30545d1a41d80b7727a98cb217
Program execution finished
Job with JobID b0a19e30545d1a41d80b7727a98cb217 has finished.
Job Runtime: 6485 ms
Accumulator Results:
- e840feeb46f0be541988fc925e9f41a9 (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)

3.4. 提交到指定的 session

通过 -yid 参数来提交到指定的session。

./bin/flink run -d -p 30 -m yarn-cluster -yid application_1613318015145_0006 ./examples/streaming/TopSpeedWindowing.jar

在这里插入图片描述

[root@master flink-1.12.1]# ./bin/flink run -d -p 30 -m yarn-cluster -yid application_1613318015145_0006 ./examples/streaming/TopSpeedWindowing.jar

2021-02-16 17:25:12,364 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-02-16 17:25:12,364 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:25:12,847 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:25:13,025 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:25:13,267 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 17:25:13,389 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:25:13,401 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:25:13,499 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
Job has been submitted with JobID 2a02b50e550b05259344c417a71dc641

3.5. sql Client

3.5.1.基本用法

  • 启动

./bin/sql-client.sh embedded

  • 帮助

help;

  • 执行

SELECT ‘Hello World’;

[root@henghe-023 flink-1.12.1]# ./bin/sql-client.sh embedded
2021-02-16 18:32:59,319 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-02-16 18:32:59,319 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
No default environment specified.
Searching for '/opt/flink-1.12.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/opt/flink-1.12.1/conf/sql-client-defaults.yaml
No session environment specified.

Command history file path: /root/.flink-sql-history
                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink sql> help;
The following commands are available:

CLEAR		Clears the current terminal.
CREATE TABLE		Create table under current catalog and database.
DROP TABLE		Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW		Creates a virtual table from a sql query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE		Describes the schema of a table with the given name.
DROP VIEW		Deletes a prevIoUsly created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN		Describes the execution plan of a query or table with the given name.
HELP		Prints the available commands.
                                                    sql Query Result (Table)
 Refresh: 1 s                                            Page: Last of 1                                        Updated: UnkNown

                    EXPR$0


3.5.2. Select 查询

Flink sql> SELECT ‘Hello World’;

在这里插入图片描述


按 ”Q” 退出这个界面
打开 http://127.0.0.1:8081 能看到这条 Select 语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink,且只输出一条结果。

注意:如果本机的临时目录存在类似 .yarn-properties-baoniu 的文件,任务会提交到 Yarn 上。

在这里插入图片描述

在这里插入图片描述

3.5.3. Explain

Explain 命令可以查看 sql 的执行计划。

Flink sql> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NaMetable(name) GROUP BY name;

== Abstract Syntax Tree ==    // 抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Optimized Logical Plan ==     // 优化后的逻辑执行计划
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])
   +- Values(type=[recordtype(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Physical Execution Plan ==   // 物理执行计划
Stage 13 : Data Source
	content : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

	Stage 15 : Operator
		content : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
		ship_strategy : HASH



Flink sql>

3.5.3.结果展示

sql Client 支持两种模式来维护并展示查询结果:

  • table mode: 在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命令启用 table mode;

SET execution.result-mode=table

  • changlog mode: 不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。

SET execution.result-mode=changelog

接下来通过实际的例子进行演示。

  • Table mode
Flink sql> SET execution.result-mode=table;
[INFO] Session property has been set.

Flink sql> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NaMetable(name) GROUP BY name;
2021-02-16 18:41:37,976 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 18:41:37,986 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 18:41:38,033 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 18:41:38,034 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 18:41:38,034 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 18:41:38,040 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
[INFO] Result retrieval cancelled.

在这里插入图片描述

q


在这里插入图片描述

  • Changlog mode
Flink sql> SET execution.result-mode=changelog;
[INFO] Session property has been set.

Flink sql> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NaMetable(name) GROUP BY name;
2021-02-16 18:43:50,969 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 18:43:50,981 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 18:43:51,037 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 18:43:51,038 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 18:43:51,039 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 18:43:51,046 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
[INFO] Result retrieval cancelled.

在这里插入图片描述


其中 ‘-’ 代表的就是撤回消息。

在这里插入图片描述


在这里插入图片描述

3.5.4 其他指令

Flink sql> help
> ;
The following commands are available:

CLEAR		Clears the current terminal.
CREATE TABLE		Create table under current catalog and database.
DROP TABLE		Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW		Creates a virtual table from a sql query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE		Describes the schema of a table with the given name.
DROP VIEW		Deletes a prevIoUsly created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN		Describes the execution plan of a query or table with the given name.
HELP		Prints the available commands.
INSERT INTO		Inserts the results of a sql SELECT query into a declared table sink.
INSERT OVERWRITE		Inserts the results of a sql SELECT query into a declared table sink and overwrite existing data.
QUIT		Quits the sql CLI client.
RESET		Resets all session configuration properties.
SELECT		Executes a sql SELECT query on the Flink cluster.
SET		Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS		Shows all user-defined and built-in functions.
SHOW TABLES		Shows all registered tables.
SOURCE		Reads a sql SELECT query from a file and executes it on the Flink cluster.
USE CATALOG		Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE		Sets the current default database. Experimental! Syntax: 'USE <name>;'

Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

3.6. Restful API

如何通过 Rest API 来提交 Jar 包和执行任务。
更详细的操作请参考 Flink 的 Restful API 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html

➜  flink-1.7.2 curl http://127.0.0.1:8081/overview
{"taskmanagers":1,"slots-total":4,"slots-available":0,"jobs-running":3,"jobs-finished":0,"jobs-cancelled":0,"jobs-Failed":0,"flink-version":"1.7.2","flink-commit":"ceba8af"}%

➜  flink-1.7.2 curl -X POST -H "Expect:" -F "jarfile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/examples/streaming/TopSpeedWindowing.jar" http://127.0.0.1:8081/jars/upload
{"filename":"/var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/flink-web-124c4895-cf08-4eec-8e15-8263d347efc2/flink-web-upload/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","status":"success"}%       
                                                                                                                                                                                                   ➜  flink-1.7.2 curl http://127.0.0.1:8081/jars
{"address":"http://localhost:8081","files":[{"id":"6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","name":"TopSpeedWindowing.jar","uploaded":1553743438000,"entry":[{"name":"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","description":null}]}]}%

➜  flink-1.7.2 curl http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/plan
{"plan":{"jid":"41029eb3feb9132619e454ec9b2a89fb","name":"CarTopSpeedWindowingExample","nodes":[{"id":"90bea66de1c231edf33913ecd54406c1","parallelism":1,"operator":"","operator_strategy":"","description":"Window(GlobalWindows(), DeltaTrigger, Timeevictor, ComparableAggregator, PassthroughWindowFunction) -> Sink: Print to Std. Out","inputs":[{"num":0,"id":"cbc357ccb763df2852fee8c4fc7d55f2","ship_strategy":"HASH","exchange":"pipelined_bounded"}],"optimizer_properties":{}},{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator_strategy":"","description":"Source: Custom Source -> Timestamps/Watermarks","optimizer_properties":{}}]}}%                                                                                                                                                    ➜  flink-1.7.2 curl -X POST http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/run
{"jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad"}%

四 .Web

在 Flink Dashboard 页面左侧可以看到有个「Submit new Job」的地方,用户可以上传 Jar 包和显示执行计划和提交任务。
Web 提交功能主要用于新手入门和演示用。

在这里插入图片描述

官方原文 :
https://ververica.cn/developers/apache-flink-zero-basic-introduction-iv-client-operation/

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐