场景介绍:
众所周知,grpc工作原理是client发送请求,server接受处理并返回响应。但是当A主机不能暴露IP和端口的情况下,B又需要向A不断发送任务,该如何实现?
分析:
首先A只能作为客户端发起请求。最简单的办法就是A每隔1秒向B发送一个请求,B收到后给予响应,并附带任务。但是这种方法开销极大。我们可以通过grpc流式传输实现A与B的长连接,A发起一次请求即可,B收到后通过grpc流式传输不断地向A返回任务(响应)
代码实现:
protobuf接口文件:
syntax = "proto3";
service LongService {
rpc hello(request) returns (stream response){}
}
message request {
string id = 1;
string info =2;
}
message response {
string status =1;
string info =2;
}
server.py(B主机)实现如下:
# -*- coding: utf-8 -*-
import grpc
from concurrent import futures
import service_proto_pb2_grpc
import service_proto_pb2
import time
import threading
import random
MAX_MESSAGE_LENGTH = 256 * 1024 * 1024 # 设置grpc最大可接受的文件大小 256M
task = []
task_lock = threading.Lock()
task_done = False
def produce_task():
global task, task_lock, task_done
while True:
task_lock.acquire()
task.append(random.randint(1, 10))
print(task)
task_lock.release()
time.sleep(5)
if task_done:
break
class Service(service_proto_pb2_grpc.LongServiceServicer):
def __init__(self):
pass
def hello(self, request, context):
global task, task_lock
while True:
if task:
task_lock.acquire()
for i in range(len(task)):
yield service_proto_pb2.response(status='success', info=str(task[i]))
task = []
task_lock.release()
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=[('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)])
service_proto_pb2_grpc.add_LongServiceServicer_to_server(Service(), server)
server.add_insecure_port('[::]:50005')
server.start()
t = threading.Thread(target=produce_task)
t.start()
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
server.stop(0)
task_done = True
if name == ‘main’:中的代码就是正常的起一个监听接口,另外还起了一个线程produce_task用来不断的制造任务并存入全局变量task中,grpc server只实现了一个函数hello,当接收到cleint端发来的请求后,在while循环中将task的值取出并返回给client端
client.py(A主机)实现如下:
# -*- coding: utf-8 -*-
import grpc
import service_proto_pb2
import service_proto_pb2_grpc
import threading
def handle(i):
print('do task [{}]'.format(i))
print('task finish')
def hello():
with grpc.insecure_channel('10.0.50.153:50005') as channel:
stub = service_proto_pb2_grpc.LongServiceStub(channel)
response = stub.hello(service_proto_pb2.request(id='789', info='012'))
for i in response:
threading.Thread(target=handle, args=(i,)).start()
if __name__ == '__main__':
hello()
client端首先打开一个通道连接到server端,并开始调用hello服务,使用for循环不断取出response,这里for循环会一直循环下去,因为response是一个迭代器,且不会收到来自server端的异常StopIteration。取出的值直接放入线程处理
总结
该方案只是一种长连接的简单实现,可以根据不同场景需求产生多种变化,例如双向流式传输等等。