1.map:
调用:
val stream = environment.readTextFile("data/access.log") println(stream.parallelism) val accessstream = stream.map( x => { val splits = x.split(",") val time = splits(0).trim.toLong val domain = splits(1).trim val traffices = splits(2).trim.todouble Access(time, domain, traffices) }) accessstream.print()
分组求和:
val stream = environment.readTextFile("data/access.log") println(stream.parallelism) val accessstream = stream.map( x => { val splits = x.split(",") val time = splits(0).trim.toLong val domain = splits(1).trim val traffices = splits(2).trim.todouble Access(time, domain, traffices) }) accessstream.keyBy(1).sum(2).print() // 过时的 accessstream.keyBy(_.domain).sum(2).print() // 不过时的
看看map底层源码:
再看看map(mapper)的map是干啥的:
看这个map方法,大部分算子都用了transform,所以我们可以直接用它:
下面我们来个自定义的map:
刚才那个跑完就关了,看不到web,下面改改代码
val stream = environment.socketTextStream("182.92.99.53", 8081) stream.map(x => { x.toInt * 10 }).print()
我们先观察一下print的写法:(有个addsink方法)
下面我们来改map的源码:
注意这个方法,java的多了个类型:
我们去找scala的写法:
第一步:
根据这个我们也能发现:
具体实现:
结果:
filter的写法:
flatMap的:
再看看更底层的:(这个是最底层的)
来实现一下:(接口的都是没实现的,所以我们这里继承一个实现类,再继承这个单操作符的接口)
参考其他算子的源码,我们可以知道,map是一进一出的,所以我们这里是要指定的,这里方便演示,类型我们都弄成Int来做:
class ZxStreamMap extends AbstractStreamOperator[Int] with OneInputStreamOperator[Int, Int] { override def processElement(element: StreamRecord[Int]): Unit = { val result = element.getValue.toInt * 10 element.replace(result) output.collect(element) } }
调用报错:
stream.transform("ZxMap2", new ZxStreamMap)
原因是这里进来是一个String类型:
所以我们要改一下:
结果ok:
stream.transform("ZxMap2", new ZxStreamMap).print()
FlatMap:
看结果:(pk被干掉了不输出)
自定义实现:
class ZxStreamFlatMap extends AbstractStreamOperator[String] with OneInputStreamOperator[String, String] { override def processElement(element: StreamRecord[String]): Unit = { val splits = element.getValue.split(",") splits.filter(_ != "pk").map(x => { output.collect(element.replace(x)) }) } }
KeyBy:
来份数据:(省,市,访问量)
因为有中间状态的,看最后几行就行了:
Tuple这种方式就不过时了:(建议用case class来做,现在的下标不清晰)
用KeySelector方式:
进来是一个tuple,key是两个字段所以也是tuple
实现一下:
用case class的方式:(这种方式更直观,推荐用)
最后是转成KeyedStream类型的:
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。