微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink-9-算子(Operators)

 

 

 1.map:

 

 

 

 

调用

    val stream = environment.readTextFile("data/access.log")
    println(stream.parallelism)
    val accessstream = stream.map( x => {
      val splits = x.split(",")
      val time = splits(0).trim.toLong
      val domain = splits(1).trim
      val traffices = splits(2).trim.todouble
      Access(time, domain, traffices)
    })
    accessstream.print()

 

 

 分组求和:

    val stream = environment.readTextFile("data/access.log")
    println(stream.parallelism)
    val accessstream = stream.map( x => {
      val splits = x.split(",")
      val time = splits(0).trim.toLong
      val domain = splits(1).trim
      val traffices = splits(2).trim.todouble
      Access(time, domain, traffices)
    })
    accessstream.keyBy(1).sum(2).print()  // 过时的
    accessstream.keyBy(_.domain).sum(2).print()  // 不过时的

 

 

 看看map底层源码:

 再看看map(mapper)的map是干啥的:

 

 看这个map方法,大部分算子都用了transform,所以我们可以直接用它:

 

 

 

下面我们来个自定义的map:

 

 

 

 刚才那个跑完就关了,看不到web,下面改改代码

    val stream = environment.socketTextStream("182.92.99.53", 8081)
    stream.map(x => {
      x.toInt * 10
    }).print()

 

 

 我们先观察一下print的写法:(有个addsink方法

 

 

 下面我们来改map的源码:

注意这个方法,java的多了个类型:

 

 

 我们去找scala的写法:

 

 

 下面我们根据java的map方法写我们自己实现的方法

 

 

 第一步:

 

 

 根据这个我们也能发现:

 

 

 

具体实现:

 

结果:

 

 

 

 

 

filter的写法:

flatMap的:

 

 

 

 

再看看更底层的:(这个是最底层的)

 

 来实现一下:(接口的都是没实现的,所以我们这里继承一个实现类,再继承这个单操作符的接口)

 

 

 参考其他算子的源码,我们可以知道,map是一进一出的,所以我们这里是要指定的,这里方便演示,类型我们都弄成Int来做:

class ZxStreamMap extends AbstractStreamOperator[Int] with OneInputStreamOperator[Int, Int] {
  override def processElement(element: StreamRecord[Int]): Unit = {
    val result = element.getValue.toInt * 10
    element.replace(result)
    output.collect(element)
  }
}

调用报错:

stream.transform("ZxMap2", new ZxStreamMap)

 

 

 原因是这里进来是一个String类型:

 

 

 所以我们要改一下:

 

 

 结果ok:

stream.transform("ZxMap2", new ZxStreamMap).print()

 

 

 

FlatMap:

 

 

 

看结果:(pk被干掉了不输出

 

 

 自定义实现:

class ZxStreamFlatMap extends AbstractStreamOperator[String] with OneInputStreamOperator[String, String] {
  override def processElement(element: StreamRecord[String]): Unit = {
    val splits = element.getValue.split(",")
    splits.filter(_ != "pk").map(x => {
      output.collect(element.replace(x))
    })
  }
}

 

KeyBy:

来份数据:(省,市,访问量)

 

 

因为有中间状态的,看最后几行就行了:

 

 Tuple这种方式就不过时了:(建议用case class来做,现在的下标不清晰)

 

 

用KeySelector方式:

进来是一个tuple,key是两个字段所以也是tuple

 

 

实现一下:

用case class的方式:(这种方式更直观,推荐用)

 

 

最后是转成KeyedStream类型的:

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐