接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming,Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务:
/* * responding stream of increment results */ service SumOnetoMany { rpc AddOnetoMany(SumRequest) returns (stream SumResponse) {} } message SumRequest { int32 toAdd = 1; } message SumResponse { int32 currentResult = 1; }
SumOnetoMany服务中AddOnetoMany函数接受一个SumRequest然后返回stream SumResponse,就这么简单。经过编译后产生了SumOnetoManyGrpc.scala文件,在这个文件里提供了有关RPC操作的api。我们看看protoc把IDL描述的服务函数变成了什么样的scala函数:
def addOnetoMany(request: SumRequest,responSEObserver: StreamObserver[SumResponse]): Unit
调用scala函数addOnetoMany需要传入参数SumRequest和StreamObserver[SumResponse],也就是说用户需要准备这两个入参数。在调用addOnetoMany函数时用户事先构建这个StreamObserver传给server,由server把结果通过这个结构传回用户。gRPC是通过StreamObserver类型实例来实现数据streaming的。这个类型的构建例子如下:
val responSEObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") }
server端通过onNext把结果不断传回给client端,因为这个responSEObserver是在client端构建的。下面是SumManyToMany的实现:
class SumOne2ManyService extends SumOnetoManyGrpc.SumOnetoMany { override def addOnetoMany(request: SumRequest,responSEObserver: StreamObserver[SumResponse]): Unit = { val currentSum: AtomicInt = Atomic(0) (1 to request.toAdd).map { _ => responSEObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) } Thread.sleep(1000) //delay and then finish responSEObserver.onCompleted() } }
这个addOnetoMany服务函数把 1-request.toAdd之间的数字逐个通过responSEObserver返还调用方。 在客户端如下调用服务:
// get asyn stub val client: SumOnetoManyGrpc.SumOnetoManyStub = SumOnetoManyGrpc.stub(channel) // prepare stream observer val streamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done incrementing !!!") override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") } // call service with stream observer client.addOnetoMany(SumRequest().withToAdd(6),streamObserver)
Client-Streaming服务的IDL如下:
/* * responding a result from a request of stream of numbers */ service SumManyToOne { rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} }
传入stream SumRequest,返回SumResponse。scalaPB自动产生scala代码中的addManyToOne函数款式如下:
def addManyToOne(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
调用方提供StreamObserver[SumResponse]用作返回结果,函数返回客方需要的StreamObserver[SumRequest]用以传递request流。注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次onNext。下面是这个服务的实现代码:
class Many2Oneservice extends SumManyToOneGrpc.SumManyToOne { val currentSum: AtomicInt = Atomic(0) override def addManyToOne(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done summing!") override def onNext(value: SumRequest): Unit = { //only allow one response if (value.toAdd > 0) currentSum.add(value.toAdd) else responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } }
客户方调用示范如下:
//pass to server for result val respStreamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done responding!") override def onNext(value: SumResponse): Unit = println(s"Result: ${value.currentResult}") } //get async stub val client = SumManyToOneGrpc.stub(channel) //get request stream observer from server val reqStreamObserver = client.addManyToOne(respStreamObserver) List(2,5,8,4,0).map { n => reqStreamObserver.onNext(SumRequest(n)) }
Bidirectional-Streaming的IDL描述如下:
/* * Sums up numbers received from the client and returns the current result after each received request. */ service SumInter { rpc AddInter(stream SumRequest) returns (stream SumResponse) {} }
这个service SumInter 描述了stream SumRequest 及 stream SumResponse运算模式。产生的对应scala函数如下:
def addInter(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
这个函数的款式与Client-Streaming服务函数是一样的。但是,我们可以通过responSEObserver传递多个SumResponse。这个服务的实现代码是这样的:
class Many2ManyService extends SumInterGrpc.SumInter { override def addInter(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done requesting!") override def onNext(value: SumRequest): Unit = { responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } }
我们可以多次调用responSEObserver.onNext。客户端源代码如下:
//create stream observer for result stream val responSEObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") } //get request container val requestObserver = client.addInter(responSEObserver) scheduler.scheduleWithFixedDelay(0.seconds,1.seconds) { val toBeAdded = Random.nextInt(11) println(s"Adding number: $toBeAdded") requestObserver.onNext(SumRequest(toBeAdded)) }
下面是本次示范的源代码:
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"
build.sbt
import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "learn-gRPC" version := "0.1" scalaVersion := "2.12.6" libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf","io.grpc" % "grpc-netty" % grpcJavaVersion,"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,"io.monix" %% "monix" % "2.3.0" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value )
src/main/protobuf/sum.proto
Syntax = "proto3"; package learn.grpc.services; /* * responding stream of increment results */ service SumOnetoMany { rpc AddOnetoMany(SumRequest) returns (stream SumResponse) {} } /* * responding a result from a request of stream of numbers */ service SumManyToOne { rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} } /* * Sums up numbers received from the client and returns the current result after each received request. */ service SumInter { rpc AddInter(stream SumRequest) returns (stream SumResponse) {} } message SumRequest { int32 toAdd = 1; } message SumResponse { int32 currentResult = 1; }
gRPCServer.scala
package learn.grpc.server import io.grpc.{ServerBuilder,ServerServiceDeFinition} trait gRPCServer { def runServer(service: ServerServiceDeFinition): Unit = { val server = ServerBuilder .forPort(50051) .addService(service) .build .start // make sure our server is stopped when jvm is shut down Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = server.shutdown() }) server.awaitTermination() } }
OnetoManyServer.scala
package learn.grpc.sum.one2many.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import monix.execution.atomic.{Atomic,AtomicInt} import learn.grpc.server.gRPCServer object One2ManyServer extends gRPCServer { class SumOne2ManyService extends SumOnetoManyGrpc.SumOnetoMany { override def addOnetoMany(request: SumRequest,responSEObserver: StreamObserver[SumResponse]): Unit = { val currentSum: AtomicInt = Atomic(0) (1 to request.toAdd).map { _ => responSEObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) } Thread.sleep(1000) //delay and then finish responSEObserver.onCompleted() } } def main(args: Array[String]) = { val svc = SumOnetoManyGrpc.bindService(new SumOne2ManyService,scala.concurrent.ExecutionContext.global) runServer(svc) } }
OnetoManyClient.scala
package learn.grpc.sum.one2many.client import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ object One2ManyClient { def main(args: Array[String]): Unit = { //build connection channel val channel = io.grpc.ManagedChannelBuilder .forAddress("LocalHost",50051) .usePlaintext(true) .build() // get asyn stub val client: SumOnetoManyGrpc.SumOnetoManyStub = SumOnetoManyGrpc.stub(channel) // prepare stream observer val streamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done incrementing !!!") override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") } // call service with stream observer client.addOnetoMany(SumRequest().withToAdd(6),streamObserver) // wait for async execution scala.io.StdIn.readLine() } }
ManyToOneserver.scala
package learn.grpc.sum.many2one.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic.{Atomic,AtomicInt} object Many2Oneserver extends gRPCServer { class Many2Oneservice extends SumManyToOneGrpc.SumManyToOne { val currentSum: AtomicInt = Atomic(0) override def addManyToOne(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done summing!") override def onNext(value: SumRequest): Unit = { //only allow one response if (value.toAdd > 0) currentSum.add(value.toAdd) else responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } } def main(args: Array[String]): Unit = { val svc = SumManyToOneGrpc.bindService(new Many2Oneservice,scala.concurrent.ExecutionContext.global) runServer(svc) } }
ManyToOneClient.scala
package learn.grpc.sum.many2one.client import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ object Many2OneClient { def main(args: Array[String]): Unit = { //build channel val channel = io.grpc.ManagedChannelBuilder .forAddress("LocalHost",50051) .usePlaintext(true) .build() //pass to server for result val respStreamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done responding!") override def onNext(value: SumResponse): Unit = println(s"Result: ${value.currentResult}") } //get async stub val client = SumManyToOneGrpc.stub(channel) //get request stream observer from server val reqStreamObserver = client.addManyToOne(respStreamObserver) List(2,0).map { n => reqStreamObserver.onNext(SumRequest(n)) } scala.io.StdIn.readLine() } }
ManyToManyServer.scala
package learn.grpc.sum.many2many.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic.{Atomic,AtomicInt} object Many2ManyServer extends gRPCServer { class Many2ManyService extends SumInterGrpc.SumInter { override def addInter(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done requesting!") override def onNext(value: SumRequest): Unit = { responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } } def main(args: Array[String]): Unit = { val svc = SumInterGrpc.bindService(new Many2ManyService,scala.concurrent.ExecutionContext.global) runServer(svc) } }
ManyToManyClient.scala
package learn.grpc.sum.many2many.client import monix.execution.Scheduler.{global => scheduler} import learn.grpc.services.sum._ import scala.concurrent.duration._ import scala.util.Random import io.grpc._ import io.grpc.stub.StreamObserver object Many2ManyClient { def main(args: Array[String]): Unit = { val channel = ManagedChannelBuilder.forAddress("localhost",50051).usePlaintext(true).build val client = SumInterGrpc.stub(channel) //create stream observer for result stream val responSEObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") } //get request container val requestObserver = client.addInter(responSEObserver) scheduler.scheduleWithFixedDelay(0.seconds,1.seconds) { val toBeAdded = Random.nextInt(11) println(s"Adding number: $toBeAdded") requestObserver.onNext(SumRequest(toBeAdded)) } scala.io.StdIn.readLine() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。