Python gRPC

Foreword

gRPC

example测试

Install the gRPC library

pip install grpcio 

Install the gRPC tool

pip install grpcio-tools 

Download official routines

git clone -b v1.66.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc 

Demo use case here

grpc/examples/python/helloworld 

Start the server first

python greeter_server.py 

image-20241120171002516

You can see it’s already listening.

Restart the client

python greeter_client.py 

Connected to the server normally.

image-20241120171021552

source code analysis

server-side

from concurrent import futures 
import logging 
 
import grpc 
import helloworld_pb2 
import helloworld_pb2_grpc 
 
# 继承自helloworld_pb2_grpc.GreeterServicer,重写了sayhello的函数 
class Greeter(helloworld_pb2_grpc.GreeterServicer): 
    def SayHello(self, request, context): 
        # 对应返回 hello 和访问者的名字 
        return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) 
 
 
def serve(): 
    # 启动还是比较简单的,设置好端口 
    port = "50051" 
    # 调用helloworld_pb2_grpc就完成了 
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) 
    server.add_insecure_port("[::]:" + port) 
    server.start() 
    print("Server started, listening on " + port) 
    server.wait_for_termination() 
 
 
if __name__ == "__main__": 
    logging.basicConfig() 
    serve() 
 

client (computing)

from __future__ import print_function 
 
import logging 
 
import grpc 
import helloworld_pb2 
import helloworld_pb2_grpc 
 
 
def run(): 
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be 
    # used in circumstances in which the with statement does not fit the needs 
    # of the code. 
    print("Will try to greet world ...") 
    # 设置本地 端口 
    with grpc.insecure_channel("localhost:50051") as channel: 
        stub = helloworld_pb2_grpc.GreeterStub(channel) 
        # 发送信息 并等待结果 
        response = stub.SayHello(helloworld_pb2.HelloRequest(name="you")) 
    print("Greeter client received: " + response.message) 
 
 
if __name__ == "__main__": 
    logging.basicConfig() 
    run() 
 

helloworld_pb2_grpc.py

# 继承的原型函数在这里 
class GreeterServicer(object): 
    """The greeting service definition. 
    """ 
 
    def SayHello(self, request, context): 
        """Sends a greeting 
        """ 
        context.set_code(grpc.StatusCode.UNIMPLEMENTED) 
        context.set_details('Method not implemented!') 
        raise NotImplementedError('Method not implemented!') 
 
    def SayHelloStreamReply(self, request, context): 
        """Missing associated documentation comment in .proto file.""" 
        context.set_code(grpc.StatusCode.UNIMPLEMENTED) 
        context.set_details('Method not implemented!') 
        raise NotImplementedError('Method not implemented!') 
 
    def SayHelloBidiStream(self, request_iterator, context): 
        """Missing associated documentation comment in .proto file.""" 
        context.set_code(grpc.StatusCode.UNIMPLEMENTED) 
        context.set_details('Method not implemented!') 
        raise NotImplementedError('Method not implemented!') 
         
# 主要是这个函数,把函数的返回绑定到一起 
def add_GreeterServicer_to_server(servicer, server): 
    rpc_method_handlers = { 
            'SayHello': grpc.unary_unary_rpc_method_handler( 
                    servicer.SayHello, 
                    request_deserializer=helloworld__pb2.HelloRequest.FromString, 
                    response_serializer=helloworld__pb2.HelloReply.SerializeToString, 
            ), 
            'SayHelloStreamReply': grpc.unary_stream_rpc_method_handler( 
                    servicer.SayHelloStreamReply, 
                    request_deserializer=helloworld__pb2.HelloRequest.FromString, 
                    response_serializer=helloworld__pb2.HelloReply.SerializeToString, 
            ), 
            'SayHelloBidiStream': grpc.stream_stream_rpc_method_handler( 
                    servicer.SayHelloBidiStream, 
                    request_deserializer=helloworld__pb2.HelloRequest.FromString, 
                    response_serializer=helloworld__pb2.HelloReply.SerializeToString, 
            ), 
    } 
    # 创建服务名称和通用句柄 
    generic_handler = grpc.method_handlers_generic_handler( 
            'helloworld.Greeter', rpc_method_handlers) 
    # server添加通用的句柄 
    server.add_generic_rpc_handlers((generic_handler,)) 
    # 将处理方法注册给server 
    server.add_registered_method_handlers('helloworld.Greeter', rpc_method_handlers) 

helloworld_pb2.py

"""Generated protocol buffer code.""" 
from google.protobuf import descriptor as _descriptor 
from google.protobuf import descriptor_pool as _descriptor_pool 
from google.protobuf import runtime_version as _runtime_version 
from google.protobuf import symbol_database as _symbol_database 
from google.protobuf.internal import builder as _builder 
_runtime_version.ValidateProtobufRuntimeVersion( 
    _runtime_version.Domain.PUBLIC, 
    5, 
    27, 
    2, 
    '', 
    'helloworld.proto' 
) 
# @@protoc_insertion_point(imports) 
 
_sym_db = _symbol_database.Default() 
 
 
 
# 这里直接用代码的形式写了一个helloworld的描述符 
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xe4\x01\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x12K\n\x13SayHelloStreamReply\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x30\x01\x12L\n\x12SayHelloBidiStream\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00(\x01\x30\x01\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3') 
 
_globals = globals() 
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) 
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'helloworld_pb2', _globals) 
if not _descriptor._USE_C_DESCRIPTORS: 
  _globals['DESCRIPTOR']._loaded_options = None 
  _globals['DESCRIPTOR']._serialized_options = b'\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW' 
  _globals['_HELLOREQUEST']._serialized_start=32 
  _globals['_HELLOREQUEST']._serialized_end=60 
  _globals['_HELLOREPLY']._serialized_start=62 
  _globals['_HELLOREPLY']._serialized_end=91 
  _globals['_GREETER']._serialized_start=94 
  _globals['_GREETER']._serialized_end=322 
# @@protoc_insertion_point(module_scope) 
 

The actual proto file used here is defined as follows

// The greeting service definition. 
service Greeter { 
  // Sends a greeting 
  rpc SayHello (HelloRequest) returns (HelloReply) {} 
} 
 
// The request message containing the user's name. 
message HelloRequest { 
  string name = 1; 
} 
 
// The response message containing the greetings 
message HelloReply { 
  string message = 1; 
} 

customizable function

The actual proto file used is in theexamples/protos/helloworld.protoin the

syntax = "proto3"; 
 
option java_multiple_files = true; 
option java_package = "io.grpc.examples.helloworld"; 
option java_outer_classname = "HelloWorldProto"; 
option objc_class_prefix = "HLW"; 
 
package helloworld; 
 
// The greeting service definition. 
service Greeter { 
  // Sends a greeting 
  rpc SayHello (HelloRequest) returns (HelloReply) {} 
 
  rpc SayHello2 (HelloRequest) returns (HelloReply) {} 
 
  rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {} 
 
  rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {} 
} 
 
// The request message containing the user's name. 
message HelloRequest { 
  string name = 1; 
} 
 
// The response message containing the greetings 
message HelloReply { 
  string message = 1; 
} 
 

The corresponding code needs to be regenerated

python -m grpc_tools.protoc -I../../protos --python_out=. --pyi_out=. --grpc_python_out=. ../../protos/helloworld.proto 

It’ll be regenerated here.

def add_GreeterServicer_to_server(servicer, server): 
    rpc_method_handlers = { 
            'SayHello': grpc.unary_unary_rpc_method_handler( 
                    servicer.SayHello, 
                    request_deserializer=helloworld__pb2.HelloRequest.FromString, 
                    response_serializer=helloworld__pb2.HelloReply.SerializeToString, 
            ), 
            'SayHello2': grpc.unary_unary_rpc_method_handler( 
                    servicer.SayHello2, 
                    request_deserializer=helloworld__pb2.HelloRequest.FromString, 
                    response_serializer=helloworld__pb2.HelloReply.SerializeToString, 
            ), 

After adding hello2 to both the server and client, run it again to see that the correct response has been given.

image-20241122144701070

At this point the simplest gRPC is complete

brief summary

The core is just three steps:

  1. Defining a proto is really defining a function and its arguments.
  2. Generate, which automatically generates the intermediate classes or member functions needed based on the definition of the
  3. Modifying server and client calls

Streaming of gRPC

The examples demonstrated above are all C/S architecture, and also gRPC common mode, one party request, one party answer, this is the ordinary RPC. the service side can not initiate the request, only the client side initiative. There are 3 other ways.

  • Responsive Streaming
  • request streaming
  • Bidirectional Flow Transmission

Still from the previous example, there is the corresponding streaming implementation

NUMBER_OF_REPLY = 10 
class Greeter(MultiGreeterServicer): 
    async def sayHello( 
        self, request: HelloRequest, context: grpc.aio.ServicerContext 
    ) -> HelloReply: 
        logging.info("Serving sayHello request %s", request) 
        for i in range(NUMBER_OF_REPLY): 
            yield HelloReply(message=f"Hello number {i}, {request.name}!") 
 

For the server-side response, you can see here that 10 is returned for this request, and that the function is asynchronous

async def run() -> None: 
    async with grpc.aio.insecure_channel("localhost:50051") as channel: 
        stub = hellostreamingworld_pb2_grpc.MultiGreeterStub(channel) 
 
        # Read from an async generator 
        async for response in stub.sayHello( 
            hellostreamingworld_pb2.HelloRequest(name="you") 
        ): 
            print( 
                "Greeter client received from async generator: " 
                + response.message 
            ) 
 
        # Direct read from the stub 
        hello_stream = stub.sayHello( 
            hellostreamingworld_pb2.HelloRequest(name="you") 
        ) 
        while True: 
            response = await hello_stream.read() 
            if response == grpc.aio.EOF: 
                break 
            print( 
                "Greeter client received from direct read: " + response.message 
            ) 

On the client side, the first is an asynchronous streaming fetch and the second is a normal streaming fetch fetch

Probably looking at the examples, it’s not obvious what streaming means here, other than being able to send requests multiple times or responses multiple times.

  • Streaming large files requires multiple requests and multiple responses, such as audio and video.
  • Active pushes or returns would require streaming to be implemented, such as ads, broadcast pushes
  • High concurrency, can respond to multiple requests at the same time, no longer sequential execution, cascade affects efficiency
  • Progress callbacks for task completion must then be responded to multiple times

grpc’s bi-directional streaming can be analogized to WebSocket, where both the client and the server can send information to each other

// The greeting service definition. 
service Greeter { 
  // Sends a greeting 
  rpc SayHello (HelloRequest) returns (HelloReply) {} 
 
  rpc SayHello2 (HelloRequest) returns (HelloReply) {} 
 
  rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {} 
 
  rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {} 
} 

The definition of the proto file in the streaming example uses a special keyword, thestreamWhere a parameter is modified by stream, then the transmission is streamed.

If it modifies to the return value, it’s server streaming, if it modifies the parameter, it’s client streaming, and if it has both then it’s bi-directional streaming

Summary

Overall gRPC is this, streaming does not seem to feel as simple as WebSocket, especially if it is used to do two-way interaction when WebSocket seems to be simpler, better to do some of the

Quote

https://ift.tt/Ms54y8r

https://ift.tt/5wkMsoG

https://ift.tt/omtEv9i