微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Akka流:通过AlsoTo将元素复制到另一个流

如何解决Akka流:通过AlsoTo将元素复制到另一个流

我想使用alsoTo将元素从一个Source复制到另一个Source,但是它不能正常工作。从Java InputStream创建Akka alsoto并进行一些转换并使用s1创建import java.io.ByteArrayInputStream import java.nio.charset.StandardCharsets import akka.actor.ActorSystem import akka.stream.IOResult import akka.stream.scaladsl.{Sink,Source,StreamConverters} import scala.concurrent.duration._ import scala.concurrent.{Await,ExecutionContext,Future} object Main { def main(args: Array[String]): Unit = { implicit val system: ActorSystem = ActorSystem("AkkaStreams_alsoto") implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8)) try { val s1: Source[List[List[String]],Future[IOResult]] = StreamConverters.fromInputStream(() => byteStream) .map { bs => val rows = bs.utf8String.split("\n").toList val valuesPerRow = rows.map(row => row.split(",").toList) valuesPerRow } // A copy of s1? val s2: Source[List[List[String]],Future[IOResult]] = s1.alsoto(Sink.collection) println("s1.runForeach: ") Await.result(s1.runForeach(println),20.seconds) println("s2.runForeach: ") Await.result(s2.runForeach(println),20.seconds) println("Done") system.terminate() } finally { byteStream.close() } } } 副本的代码示例:

s1.runForeach: 
List(List(a,c),List(d,f),List(g,j))
s2.runForeach: 
Done

它产生以下输出

s2.runForeach

如您所见,InputStream不打印任何元素。这种行为的原因是什么-是因为它在读取Java data.table时产生了副作用吗?

我正在使用Akka Streams v2.6.8。

解决方法

我想使用alsoTo来将元素从一个Source复制到另一个,但是却无法正常工作。

alsoTo不会将元素从Source复制到另一个Source;它有效地从Source / Flow复制元素,并将其发送到另一个SinkalsoTo方法的参数)。因此,您的期望是不正确的。

如您所见,s2.runForeach不打印任何元素。这种行为的原因是什么-是因为它在读取Java InputStream时产生了副作用吗?

因为byteStreamval,所以s1.runForeach(println)s2.runForeach(println)都“共享”此实例,即使它们是两个截然不同的 Akka Stream蓝图。因此,当调用s1.runForeach(println)时,byteStream被消耗,而之后执行s2.runForeach(println)时,InputStream中没有任何内容可供s2.runForeach(println)打印。

byteStream更改为def,并打印以下内容:

s1.runForeach: 
List(List(a,b,c),List(d,e,f),List(g,h,j))
s2.runForeach: 
List(List(a,j))
Done

这说明了为什么s2.runForeach(println)在这种特殊情况下不打印任何内容,但却没有真正显示alsoTo的实际作用。您的设置存在缺陷,因为s2.runForeach(println)仅打印Source中的元素,而忽略了alsoTo(Sink.collection)的物化值。

查看alsoTo行为的一种简单方法如下:

val byteStream = new ByteArrayInputStream("a,c\nd,f\ng,j".getBytes(StandardCharsets.UTF_8))

val s1: Source[List[List[String]],Future[IOResult]] =
  StreamConverters.fromInputStream(() => byteStream)
    .map { bs =>
      val rows = bs.utf8String.split("\n").toList
      val valuesPerRow = rows.map(row => row.split(",").toList)
      valuesPerRow
    }

val stream = s1.alsoTo(Sink.foreach(println)).runWith(Sink.foreach(println))
                                          // ^ same thing as .runForeach(println)
Await.ready(stream,5.seconds)
println("Done")
system.terminate()

运行以上命令会显示以下内容:

List(List(a,j))
List(List(a,j))
Done

一个 Source中的元素将同时发送到 Sink s。

如果您想使用Sink.collection ...

val (result1,result2) =
  s1.alsoToMat(Sink.collection)(Keep.right).toMat(Sink.collection)(Keep.both).run()
 // ^ note the use of alsoToMat in order to retain the materialized value

val res1 = Await.result(result1,5.seconds)
val res2 = Await.result(result2,5.seconds)
println(s"res1: $res1")
println(s"res2: $res2")
println("Done")
system.terminate()

...打印...

res1: List(List(List(a,j)))
res2: List(List(List(a,j)))
Done

同样,将一个Source中的元素发送到两个Sink

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。