微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

ScalaPB3: gRPC streaming

  接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming,Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务:

/*
 *  responding stream of increment results
 */
service SumOnetoMany {
  rpc AddOnetoMany(SumRequest) returns (stream SumResponse) {}
}

message SumRequest {
  int32 toAdd = 1;
}

message SumResponse {
  int32 currentResult = 1;
}

SumOnetoMany服务中AddOnetoMany函数接受一个SumRequest然后返回stream SumResponse,就这么简单。经过编译后产生了SumOnetoManyGrpc.scala文件在这文件里提供了有关RPC操作的api。我们看看protoc把IDL描述的服务函数变成了什么样的scala函数

def addOnetoMany(request: SumRequest,responSEObserver: StreamObserver[SumResponse]): Unit

调用scala函数addOnetoMany需要传入参数SumRequest和StreamObserver[SumResponse],也就是说用户需要准备这两个入参数。在调用addOnetoMany函数用户事先构建这个StreamObserver传给server,由server把结果通过这个结构传回用户。gRPC是通过StreamObserver类型实例来实现数据streaming的。这个类型的构建例子如下:

val responSEObserver = new StreamObserver[SumResponse] {
      def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
      def onCompleted(): Unit         = println("ON_COMPLETED")
      def onNext(value: SumResponse): Unit =
        println(s"ON_NEXT: Current sum: ${value.currentResult}")
    }

server端通过onNext把结果不断传回给client端,因为这个responSEObserver是在client端构建的。下面是SumManyToMany的实现:

class SumOne2ManyService extends SumOnetoManyGrpc.SumOnetoMany {
    override def addOnetoMany(request: SumRequest,responSEObserver: StreamObserver[SumResponse]): Unit = {
      val currentSum: AtomicInt = Atomic(0)
      (1 to request.toAdd).map { _ =>
          responSEObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
      }
      Thread.sleep(1000)     //delay and then finish
      responSEObserver.onCompleted()
    }
  }

这个addOnetoMany服务函数把 1-request.toAdd之间的数字逐个通过responSEObserver返还调用方。 在客户端如下调用服务:

// get asyn stub
    val client: SumOnetoManyGrpc.SumOnetoManyStub = SumOnetoManyGrpc.stub(channel)
// prepare stream observer
    val streamObserver = new StreamObserver[SumResponse] {
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
      override def onCompleted(): Unit = println("Done incrementing !!!")
      override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
    }
// call service with stream observer
    client.addOnetoMany(SumRequest().withToAdd(6),streamObserver)

Client-Streaming服务的IDL如下:

/*
 *  responding a result from a request of stream of numbers
 */
service SumManyToOne {
  rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
}

传入stream SumRequest,返回SumResponse。scalaPB自动产生scala代码中的addManyToOne函数款式如下:

def addManyToOne(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

调用方提供StreamObserver[SumResponse]用作返回结果,函数返回客方需要的StreamObserver[SumRequest]用以传递request流。注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次onNext。下面是这个服务的实现代码

class Many2Oneservice extends SumManyToOneGrpc.SumManyToOne {
     val currentSum: AtomicInt = Atomic(0)
     override def addManyToOne(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
       new StreamObserver[SumRequest] {
         val currentSum: AtomicInt = Atomic(0)
         override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
         override def onCompleted(): Unit = println("Done summing!")
         override def onNext(value: SumRequest): Unit = {
           //only allow one response
           if (value.toAdd > 0)
              currentSum.add(value.toAdd)
           else
              responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
         }
       }
   }

客户方调用示范如下:

//pass to server for result
    val respStreamObserver = new StreamObserver[SumResponse] {
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
      override def onCompleted(): Unit = println("Done responding!")
      override def onNext(value: SumResponse): Unit =
        println(s"Result: ${value.currentResult}")
    }
    //get async stub
    val client = SumManyToOneGrpc.stub(channel)

    //get request stream observer from server
    val reqStreamObserver = client.addManyToOne(respStreamObserver)

    List(2,5,8,4,0).map { n =>
      reqStreamObserver.onNext(SumRequest(n))
    }

Bidirectional-Streaming的IDL描述如下:

/*
 * Sums up numbers received from the client and returns the current result after each received request.
 */
service SumInter {
  rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
}

这个service SumInter 描述了stream SumRequest 及 stream SumResponse运算模式。产生的对应scala函数如下:

def addInter(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

这个函数的款式与Client-Streaming服务函数是一样的。但是,我们可以通过responSEObserver传递多个SumResponse。这个服务的实现代码是这样的:

class Many2ManyService extends SumInterGrpc.SumInter {
    override def addInter(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
      new StreamObserver[SumRequest] {
        val currentSum: AtomicInt = Atomic(0)
        override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
        override def onCompleted(): Unit = println("Done requesting!")
        override def onNext(value: SumRequest): Unit = {
          responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
        }
      }
  }

我们可以多次调用responSEObserver.onNext。客户端源代码如下:

//create stream observer for result stream
    val responSEObserver = new StreamObserver[SumResponse] {
      def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
      def onCompleted(): Unit         = println("ON_COMPLETED")
      def onNext(value: SumResponse): Unit =
        println(s"ON_NEXT: Current sum: ${value.currentResult}")
    }
    //get request container
    val requestObserver = client.addInter(responSEObserver)

    scheduler.scheduleWithFixedDelay(0.seconds,1.seconds) {
      val toBeAdded = Random.nextInt(11)
      println(s"Adding number: $toBeAdded")
      requestObserver.onNext(SumRequest(toBeAdded))
    }

下面是本次示范的源代码

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion

name := "learn-gRPC"

version := "0.1"

scalaVersion := "2.12.6"

libraryDependencies ++= Seq(
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf","io.grpc" % "grpc-netty" % grpcJavaVersion,"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,"io.monix" %% "monix" % "2.3.0"
)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

src/main/protobuf/sum.proto

Syntax = "proto3";

package learn.grpc.services;

/*
 *  responding stream of increment results
 */
service SumOnetoMany {
  rpc AddOnetoMany(SumRequest) returns (stream SumResponse) {}
}

/*
 *  responding a result from a request of stream of numbers
 */
service SumManyToOne {
  rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
}

/*
 * Sums up numbers received from the client and returns the current result after each received request.
 */
service SumInter {
  rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
}

message SumRequest {
  int32 toAdd = 1;
}

message SumResponse {
  int32 currentResult = 1;
}

gRPCServer.scala

package learn.grpc.server
import io.grpc.{ServerBuilder,ServerServiceDeFinition}

trait gRPCServer {
  def runServer(service: ServerServiceDeFinition): Unit = {
    val server = ServerBuilder
      .forPort(50051)
      .addService(service)
      .build
      .start

    // make sure our server is stopped when jvm is shut down
    Runtime.getRuntime.addShutdownHook(new Thread() {
      override def run(): Unit = server.shutdown()
    })

    server.awaitTermination()
  }

}

OnetoManyServer.scala

package learn.grpc.sum.one2many.server
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
import monix.execution.atomic.{Atomic,AtomicInt}
import learn.grpc.server.gRPCServer

object One2ManyServer extends gRPCServer {

  class SumOne2ManyService extends SumOnetoManyGrpc.SumOnetoMany {
    override def addOnetoMany(request: SumRequest,responSEObserver: StreamObserver[SumResponse]): Unit = {
      val currentSum: AtomicInt = Atomic(0)
      (1 to request.toAdd).map { _ =>
          responSEObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
      }
      Thread.sleep(1000)     //delay and then finish
      responSEObserver.onCompleted()
    }
  }

  def main(args: Array[String]) = {
    val svc = SumOnetoManyGrpc.bindService(new SumOne2ManyService,scala.concurrent.ExecutionContext.global)
    runServer(svc)
  }

}

OnetoManyClient.scala

package learn.grpc.sum.one2many.client
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._

object One2ManyClient {
  def main(args: Array[String]): Unit = {

    //build connection channel
    val channel = io.grpc.ManagedChannelBuilder
      .forAddress("LocalHost",50051)
      .usePlaintext(true)
      .build()

    // get asyn stub
    val client: SumOnetoManyGrpc.SumOnetoManyStub = SumOnetoManyGrpc.stub(channel)
// prepare stream observer
    val streamObserver = new StreamObserver[SumResponse] {
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
      override def onCompleted(): Unit = println("Done incrementing !!!")
      override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
    }
// call service with stream observer
    client.addOnetoMany(SumRequest().withToAdd(6),streamObserver)

    // wait for async execution
    scala.io.StdIn.readLine()
  }
}

ManyToOneserver.scala

package learn.grpc.sum.many2one.server
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
import learn.grpc.server.gRPCServer
import monix.execution.atomic.{Atomic,AtomicInt}

object Many2Oneserver extends gRPCServer {
   class Many2Oneservice extends SumManyToOneGrpc.SumManyToOne {
     val currentSum: AtomicInt = Atomic(0)
     override def addManyToOne(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
       new StreamObserver[SumRequest] {
         val currentSum: AtomicInt = Atomic(0)
         override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
         override def onCompleted(): Unit = println("Done summing!")
         override def onNext(value: SumRequest): Unit = {
           //only allow one response
           if (value.toAdd > 0)
              currentSum.add(value.toAdd)
           else
              responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
         }
       }
   }

   def main(args: Array[String]): Unit = {
     val svc = SumManyToOneGrpc.bindService(new Many2Oneservice,scala.concurrent.ExecutionContext.global)
     runServer(svc)
   }
}

ManyToOneClient.scala

package learn.grpc.sum.many2one.client
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._

object Many2OneClient {
  def main(args: Array[String]): Unit = {
    //build channel
    val channel = io.grpc.ManagedChannelBuilder
      .forAddress("LocalHost",50051)
      .usePlaintext(true)
      .build()
    //pass to server for result
    val respStreamObserver = new StreamObserver[SumResponse] {
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
      override def onCompleted(): Unit = println("Done responding!")
      override def onNext(value: SumResponse): Unit =
        println(s"Result: ${value.currentResult}")
    }
    //get async stub
    val client = SumManyToOneGrpc.stub(channel)

    //get request stream observer from server
    val reqStreamObserver = client.addManyToOne(respStreamObserver)

    List(2,0).map { n =>
      reqStreamObserver.onNext(SumRequest(n))
    }
    scala.io.StdIn.readLine()
  }
}

ManyToManyServer.scala

package learn.grpc.sum.many2many.server
import io.grpc.stub.StreamObserver
import learn.grpc.services.sum._
import learn.grpc.server.gRPCServer
import monix.execution.atomic.{Atomic,AtomicInt}
object Many2ManyServer extends gRPCServer {
  class Many2ManyService extends SumInterGrpc.SumInter {
    override def addInter(responSEObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
      new StreamObserver[SumRequest] {
        val currentSum: AtomicInt = Atomic(0)

        override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")

        override def onCompleted(): Unit = println("Done requesting!")

        override def onNext(value: SumRequest): Unit = {
          responSEObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
        }
      }
  }
  def main(args: Array[String]): Unit = {
    val svc = SumInterGrpc.bindService(new Many2ManyService,scala.concurrent.ExecutionContext.global)
    runServer(svc)
  }

}

ManyToManyClient.scala

package learn.grpc.sum.many2many.client
import monix.execution.Scheduler.{global => scheduler}
import learn.grpc.services.sum._

import scala.concurrent.duration._
import scala.util.Random
import io.grpc._
import io.grpc.stub.StreamObserver

object Many2ManyClient {
  def main(args: Array[String]): Unit = {
    val channel = ManagedChannelBuilder.forAddress("localhost",50051).usePlaintext(true).build
    val client  = SumInterGrpc.stub(channel)
    //create stream observer for result stream
    val responSEObserver = new StreamObserver[SumResponse] {
      def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
      def onCompleted(): Unit         = println("ON_COMPLETED")
      def onNext(value: SumResponse): Unit =
        println(s"ON_NEXT: Current sum: ${value.currentResult}")
    }
    //get request container
    val requestObserver = client.addInter(responSEObserver)

    scheduler.scheduleWithFixedDelay(0.seconds,1.seconds) {
      val toBeAdded = Random.nextInt(11)
      println(s"Adding number: $toBeAdded")
      requestObserver.onNext(SumRequest(toBeAdded))
    }

    scala.io.StdIn.readLine()
  }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐