Using gRPC with Scala - Server Implementation
What is RPC?
Remote Procedure Call (RPC) is a powerful technique for constructing distributed, client-server based applications. It is based on extending the conventional local procedure calling so that the called procedure need not exist in the same address space as the calling procedure.
What is gRPC?
gRPC is a modern open source high performance RPC framework developed by Google and Introduced in 2015 that can run in any environment and communicate to any service in any supported language. It used by many major Internet Companies such as Google, Netflix, Cisco, CoreOS, Juniper etc.
gRPC uses Protocol Buffers as it's Interface Definition Language.
Let's get started!
I came upon a problem in my work where I was required to use gRPC with Scala. Being a Software Developer, I knew how to do it in Java very well, but for some engineering reasons I was asked to write the same in Scala. Scala being a JVM language makes it very easy to write very efficient code in a very short footprint. The work that takes 500 lines of code in Java can be done easily in <100 lines in Scala. Due to lack of documentation from ScalaPB on Streaming Calls, I decided to write my on article on this.
Prerequisites
- Basic knowledge of Scala
- An empty scala project set up with ScalaPB and SBT
- Basic Knowledge of ProtoBufs
Creating the Protobuf
Firstly, we need a proto file containing the messages and a service. We'll create a HelloWorldProto.proto file inside src/main/protobuf folder.
syntax="proto3";
package in.pbehre.proto;
message HelloRequest {
string name=1;
}
message HelloResponse {
string welcome_message=1;
}
service HelloWorld {
rpc sayHello(HelloRequest) returns (HelloResponse);
rpc clientStream(stream HelloRequest) returns (HelloResponse);
rpc serverStream(HelloRequest) returns (stream HelloResponse);
rpc streamHello(stream HelloRequest) returns (stream HelloResponse);
}
- HelloRequest - Message to be sent as request to the server
- HelloResponse - Message to be sent as Response to the client
- HelloWorld - gRPC service containing different methods:
- sayHello - Unary Call
- clientStream - Client Side Streaming
- serverStream - Server Side Streaming
- streamHello - Bi-Directional Streaming
Implementing the service - HelloWorld
Create a new file HelloService.scala in src/main/scala/[your package]/ with the below content:
package in.pbehre.scala
import in.pbehre.proto.HelloWorldProto._
import io.grpc.stub.StreamObserver
import scala.concurrent.Future
class HelloService extends HelloWorldGrpc .HelloWorld {
//Service to implement calls
}
This is the basic service structure, needs to extend the HelloWorldGrpc.HelloWorld Base Implementation Class. Let's go ahead and override the methods
Unary Call - sayHello
This call takes a single HelloRequest Object and Returns a Future with HelloResponse Object.
override def sayHello(request: HelloRequest): Future[HelloResponse] = {
val name : String = request.name
val reply = HelloResponse(welcomeMessage = "Welcome, " + name)
Future.successful(reply)
}
Client Side Streaming - clientStream
This will take a stream of objects from the client in a request observer, and then return them back to the client once the stream is end/committed in a single HelloResponse Object. Only one request session is used to stream content to the server.
override def clientStream(responseObserver: StreamObserver[HelloResponse]): StreamObserver[HelloRequest] = {
var count: Int = 0
var names: String = ""
val requestObserver: StreamObserver[HelloRequest] = new StreamObserver[HelloRequest] {
override def onNext(value: HelloRequest): Unit = {
println("Received request: " + value.name)
count = count + 1
names+= value.name + ", "
}
override def onError(t: Throwable): Unit = {
println(t)
}
override def onCompleted(): Unit = {
responseObserver.onNext(HelloResponse("Welcome, " + names + "\nTotal Count: " + count))
responseObserver.onCompleted()
println("clientStream::onCompleted()")
}
}
requestObserver
}
Server Side Streaming - serverStream
Will take a single request HelloRequest object which will establish the request and thanks to http2.0, the grpc function will return a multiple stream of HelloResponse object in the same request.
override def serverStream(request: HelloRequest, responseObserver: StreamObserver[HelloResponse]): Unit = {
val values : List[String] = List(
"First Stream packet",
"Second stream packet",
"Third Stream packet",
"Fourth Stream packet"
)
values.foreach(s => responseObserver.onNext(HelloResponse(welcomeMessage = s)))
responseObserver.onCompleted()
}
Bi-Directional Streaming - streamHello
Takes a stream of HelloRequest Objects and returns a stream of HelloResponse objects
override def streamHello(responseObserver: StreamObserver[HelloResponse]): StreamObserver[HelloRequest] = {
val requestObserver: StreamObserver[HelloRequest] = new StreamObserver[HelloRequest] {
override def onNext(value: HelloRequest): Unit = {
println("Received a Request: " + value.name)
responseObserver.onNext(HelloResponse(welcomeMessage = "Hello World, " + value.name))
}
override def onError(t: Throwable): Unit = {
println(t)
}
override def onCompleted(): Unit = {
println("Service completed")
}
}
requestObserver
}
That's it!
Your final file service file should look like:
package in.pbehre.scala
import in.pbehre.proto.HelloWorldProto._
import io.grpc.stub.StreamObserver
import scala.concurrent.Future
class HelloService extends HelloWorldGrpc .HelloWorld {
override def sayHello(request: HelloRequest): Future[HelloResponse] = {
val name : String = request.name
val reply = HelloResponse(welcomeMessage = "Welcome, " + name)
Future.successful(reply)
}
override def streamHello(responseObserver: StreamObserver[HelloResponse]): StreamObserver[HelloRequest] = {
val requestObserver: StreamObserver[HelloRequest] = new StreamObserver[HelloRequest] {
override def onNext(value: HelloRequest): Unit = {
println("Received a Request: " + value.name)
responseObserver.onNext(HelloResponse(welcomeMessage = "Hello World, " + value.name))
}
override def onError(t: Throwable): Unit = {
println(t)
}
override def onCompleted(): Unit = {
println("Service completed")
}
}
requestObserver
}
override def clientStream(responseObserver: StreamObserver[HelloResponse]): StreamObserver[HelloRequest] = {
var count: Int = 0
var names: String = ""
val requestObserver: StreamObserver[HelloRequest] = new StreamObserver[HelloRequest] {
override def onNext(value: HelloRequest): Unit = {
println("Received request: " + value.name)
count = count + 1
names+= value.name + ", "
}
override def onError(t: Throwable): Unit = {
println(t)
}
override def onCompleted(): Unit = {
responseObserver.onNext(HelloResponse("Welcome, " + names + "\nTotal Count: " + count))
responseObserver.onCompleted()
println("clientStream::onCompleted()")
}
}
requestObserver
}
override def serverStream(request: HelloRequest, responseObserver: StreamObserver[HelloResponse]): Unit = {
val values : List[String] = List(
"First Stream packet",
"Second stream packet",
"Third Stream packet",
"Fourth Stream packet"
)
values.foreach(s => responseObserver.onNext(HelloResponse(welcomeMessage = s)))
responseObserver.onCompleted()
}
}
Implementing the GRPC Server
Create a new file HelloServer.scala in src/main/scala/[your package]/ with the below content:
package in.pbehre.scala
import in.pbehre.proto.HelloWorldProto._
import io.grpc.{Server, ServerBuilder}
import java.util.logging.{LogManager, Logger}
import scala.concurrent.ExecutionContext
object App {
val logger: Logger = Logger.getLogger(classOf[App].getName)
val port = 50051
def main(args: Array[String]): Unit = {
val server = new HelloWorldServer(ExecutionContext.global)
server.start
server.blockUntilShutdown
}
}
class HelloWorldServer(executionContext: ExecutionContext) { self =>
private[this] var server: Server = null
def start(): Unit = {
server = ServerBuilder
.forPort(App.port)
.addService(HelloWorldGrpc.bindService(new HelloService, executionContext))
.build()
.start()
App.logger.info("Starting server on port: " + App.port)
sys.addShutdownHook {
System.err.println("*** shutting down gRPC server since JVM is shutting down")
self.stop()
System.err.println("*** server shut down")
}
}
def stop(): Unit = {
if (server != null) {
server.shutdown()
}
}
def blockUntilShutdown(): Unit = {
if (server != null) {
server.awaitTermination()
}
}
}
That's it! You're done with the code.
Client Implementation coming soon!
You can find the code and a sample project in this repository: Github