微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

(1)StructuredStreaming简介

一,概述

Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于spark sql引擎之上。可以用处理静态数据的方式去处理你的流计算。随着流数据的不断流入,Sparksql引擎会增量的连续不断的处理并且更新结果。可以使用DataSet/DataFrame的API进行 streaming aggregations, event-time windows, stream-to-batch joins等等。计算的执行也是基于优化后的spark sql引擎。通过checkpointing and Write Ahead Logs该系统可以保证点对点,一次处理,容错担保。

内部,认情况下,Structured Streaming查询使用的是微批处理引擎,其处理数据流作为一些列小的批处理job,因此实现低至100ms的端到端延迟和仅一次处理容错担保。然而,从spark2.3.1开始,已经引入了一个新的更低延迟的处理模型,叫做Continuous Processing,该模型可以实现低至1ms的端到端延迟,并且实现最少一次担保。

Structured streaming的核心观点是将数据流当作一张表,该表在被连续不断的追加数据。

这使得structured streaming成为了一种新的处理模型,该模型很像批处理模型。可以像执行标准的批处理查询一样执行你的流处理计算,Spark内部运行的时候是以在无界输入表上的增量查询方式。

可以把输入的数据流当成一张表,数据流新增的每一条数据就像添加到该表的新增行数据。

 

在输入数据流上执行的query操作会生成一个结果表。每个触发间隔,比如1s,新的行都会被追加到输入表,最终更新结果表。结果表无论何时得到更新,都将会将变化的结果行写入外部的sink。

Output定义了如何将结果写入外部存储系统,输出模式有以下几种:

  1. Complete mode

整个更新的结果表都会被刷新写入到外部存储系统。如何处理整张表的写操作取决于存储器的connector。

  1. Append mode

仅自上次触发追加到结果表的新行会被刷写到外部存储系统。该种情况仅适合Result Table中的行不被更新的情况。

  1. Update mode

仅仅自上次触发起在结果表更新的行才会被写入外部存储系统,这个是从spark 2.1.1开始的。如果查询不存在聚合函数,那该模式就跟append模式一样了。

二,例子

  1. Socket Source-> console sink

下载安装nc,请不要用yum直接安装。

wget  http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm

rpm   -iUv nc-1.84-22.el6.x86_64.rpm

启动nc,然后准备写入数据到Structured Streaming

nc -lk 9999

启动nc之后,开始启动spark-shell

Spark-shell –master local[*]

执行如下代码

val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

val words = lines.as[String].flatMap(_.split(" "))

 

val wordCounts = words.withWatermark("timestamp", "30 seconds").groupBy("value").count()

val query = wordCounts.writeStream.outputMode("Update").format("console").start()

query.awaitTermination()

 

2,编程模型讲解

例子输入是生成一个lines DataFrame,然后作为输入表。最终wordCounts DataFrame是结果表。基于lines DataFrame的查询跟静态的Dataframe查询时一样的。然而,当查询一旦启动,Spark 会不停的检查Socket链接是否有新的数据。如果有新的数据,Spark 将会在新数据上运行一个增量的查询,并且组合之前的counts结果,计算得到更新后的统计

 

注意,structured streaming不会管理整张表。他会从流数据源读取最新可用的数据,增量处理去更新结果,然后丢弃原数据。它仅仅会保存很小的必要中间状态数据用来更新结果。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐