服务端代码:
import os
import socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
pool = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='simple-work-thread-pool'
)
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(2048)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket: socket.socket, addr: tuple) -> int:
try:
pid = os.getpid()
logger.debug(f'pid: {pid}, get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
# time.sleep(1)
send_len = send_response(
clientsocket=clientsocket, addr=addr, response_body=response_body)
logger.debug(f'发送了响应')
return send_len
except Exception as error:
logger.exception(error)
def start_request_callback(future: Future) -> None:
send_len: int = future.result()
logger.debug(
f'{threading.current_thread().name}, send payload len is {send_len}')
if __name__ == "__main__":
serversocket = init_serversocket()
pool = multiprocessing.Pool(processes=16)
while True:
clientsocket, addr = serversocket.accept()
clientsocket: socket.socket
addr: tuple
# future: Future = pool.submit(start_request, clientsocket, addr)
# future.add_done_callback(start_request_callback)
pool.apply_async(start_request, (clientsocket, addr))
pool.close()
pool.join()
服务端使用进程池并发处理来自客户端的 TCP 请求
客户端代码:
from base64 import encode
import socket # 客户端 发送一个数据,再接收一个数据
import json
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
failture_requests = []
def send_request(index:int):
try:
# 声明socket类型,同时生成链接对象
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(('127.0.0.1', 6001)) # 建立一个链接,连接到本地的6969端口
payload = b'ponponon'
clientsocket.send(payload)
data = clientsocket.recv(1024)
payload = data.decode()
logger.debug(index)
clientsocket.close()
logger.debug('请求完成')
except Exception as error:
failture_requests.append(index)
logger.exception(error)
pool = ThreadPoolExecutor(max_workers=2)
for index in range(100):
pool.submit(send_request,index)
pool.shutdown(wait=True)
logger.debug(failture_requests)
logger.debug(len(failture_requests))
因为我的客户端使用线程池并发,只要 max_workers 大于 1 就会一直卡死
测试平台是 macos 13.3.1 (22E261) + python3.10.10,会卡死
但是我在 ubuntu20.04 + python3.11.3 上就是一切正常,不会出现卡死的问题
但是 max_workers 是 1 就没有问题,我怀疑问题出在服务端,但是具体原因我分析不出来
服务端如果用线程池,就一切正常,服务端和客户端都正常。
但是我只想用多进程
import os
import socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(2048)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket_fd: int, addr: tuple) -> int:
clientsocket = socket.fromfd(clientsocket_fd, socket.AF_INET, socket.SOCK_STREAM)
os.close(clientsocket_fd)
try:
pid = os.getpid()
logger.debug(f'pid: {pid}, get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
# time.sleep(1)
send_len = send_response(
clientsocket=clientsocket, addr=addr, response_body=response_body)
logger.debug(f'发送了响应')
return send_len
except Exception as error:
logger.exception(error)
finally:
clientsocket.close()
def worker_process(clientsocket_fd, addr):
start_request(clientsocket_fd, addr)
if __name__ == "__main__":
serversocket = init_serversocket()
pool = multiprocessing.Pool(processes=16)
while True:
try:
clientsocket, addr = serversocket.accept()
clientsocket_fd = clientsocket.fileno()
pool.apply_async(worker_process, (clientsocket_fd, addr))
except Exception as error:
logger.exception(error)
pool.close()
pool.join()
我们正在使用SpringBoot在JAVA中开发基于grpc的服务。 我们正在跟进https://github.com/LogNet/grpc-spring-boot-starter @GrpcService:用于服务器端服务 @GrpcClient:用于客户端存根 我可以测试这个应用程序。 问题:在生产过程中,我们每秒将收到大约5000个请求,每个请求可能需要25毫秒到1秒。 客户端:如何实现连
并发请求处理 我创建了一个服务器,并使用s.listenandserve()来处理请求。据我所知,这些请求是同时送达的。我使用一个简单的处理程序来检查它: 我看到,如果我发送了几个请求,我将看到所有的“1”出现,只有在一秒钟后所有的“2”出现。但是如果删除Hibernate行,我会看到程序在完成前一个请求之前从不启动请求(输出为1 2 1 2 1 2...)。所以我不明白,如果它们是并发的还是不是
我知道PHP支持处理多个并发连接,并且根据服务器的不同,它可以像这个答案中提到的那样进行配置 服务器是如何管理多个连接的?它是为每个请求派生一个子进程,还是使用线程处理,还是使用线程池处理? 链接的答案说一个进程是分叉的,然后作者在评论中说是线程还是进程,这让人很困惑,如果请求是使用子进程、线程还是线程池提供的?
我有一个简单的过程,需要处理一个表的记录,理想情况下运行多个流程实例,而不处理同一记录。我在MySQL中这样做的方式相当常见(尽管我认为令牌字段更像是一种黑客行为): 向表中添加几个字段: 然后是一个简单的处理脚本: 我正在使用PostgreSQL数据库的系统中实现这样的过程。我知道Pg在锁定方面可以被认为是比MySQL更成熟的,这要归功于MVCC - 我可以在Pg中使用行锁定或其他一些功能而不是
客户端通过HTTP请求(通过浏览器post)调用Servlet,然后Servlet应向外部网站发送请求(get),并从网站接收响应(post)。servlet继续响应并向客户端发送响应(post)。 我的问题是如何在Servlet中发送和接收请求/响应并将某些内容发送回客户端?
在使用了RESTful服务的场景下,非浏览器的客户端也可以直接提交多路文件请求。上一节讲述的所有例子与配置在这里也都同样适用。但与浏览器不同的是,提交的文件和简单的表单字段,客户端发送的数据可以更加复杂,数据可以指定为某种特定的内容类型(content type)——比如,一个多路上传请求可能第一部分是个文件,而第二部分是个JSON格式的数据: POST /someUrl Cont