当前位置: 首页 > 知识库问答 >
问题:

为什么使用 python 的进程池处理并发的 TCP 请求,会导致客户端并发卡住?

秦焱
2023-05-04

服务端代码:

import os
import socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing

default_encoding: str = 'utf-8'

pool = ThreadPoolExecutor(
    max_workers=20,
    thread_name_prefix='simple-work-thread-pool'
)


def init_serversocket() -> socket.socket:
    serversocket = socket.socket(
        family=socket.AF_INET,
        type=socket.SOCK_STREAM
    )

    # 获取本地主机名
    host = socket.gethostname()

    logger.debug(f'host {host}')

    port = 6001

    # 绑定端口号
    serversocket.bind(('0.0.0.0', port))

    # 设置最大连接数,超过后排队
    serversocket.listen(2048)

    return serversocket


def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
    send_len: int = clientsocket.send(response_body)
    clientsocket.close()
    return send_len


def start_request(clientsocket: socket.socket, addr: tuple) -> int:
    try:
        pid = os.getpid()
        logger.debug(f'pid: {pid}, get message from {addr}')
        request_body: bytes = clientsocket.recv(2048)
        request_text: str = request_body.decode(encoding=default_encoding)

        response_text: str = f'server get message: {request_text}'

        response_body: bytes = response_text.encode(default_encoding)
        # time.sleep(1)
        send_len = send_response(
            clientsocket=clientsocket, addr=addr, response_body=response_body)
        logger.debug(f'发送了响应')
        return send_len
    except Exception as error:
        logger.exception(error)


def start_request_callback(future: Future) -> None:
    send_len: int = future.result()
    logger.debug(
        f'{threading.current_thread().name}, send payload len is {send_len}')


if __name__ == "__main__":
    serversocket = init_serversocket()

    pool = multiprocessing.Pool(processes=16)

    while True:
        clientsocket, addr = serversocket.accept()

        clientsocket: socket.socket
        addr: tuple

        # future: Future = pool.submit(start_request, clientsocket, addr)
        # future.add_done_callback(start_request_callback)

        pool.apply_async(start_request, (clientsocket, addr))

    pool.close()
    pool.join()

服务端使用进程池并发处理来自客户端的 TCP 请求

客户端代码:

from base64 import encode
import socket  # 客户端 发送一个数据,再接收一个数据
import json
from loguru import logger
from concurrent.futures import ThreadPoolExecutor


failture_requests = []


def send_request(index:int):
    try:
        # 声明socket类型,同时生成链接对象
        clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        clientsocket.connect(('127.0.0.1', 6001))  # 建立一个链接,连接到本地的6969端口

        payload = b'ponponon'

        clientsocket.send(payload)

        data = clientsocket.recv(1024)

        payload = data.decode()
        logger.debug(index)

        clientsocket.close()
        logger.debug('请求完成')
    except Exception as error:
        failture_requests.append(index)
        logger.exception(error)


pool = ThreadPoolExecutor(max_workers=2)
for index in range(100):
    pool.submit(send_request,index)

pool.shutdown(wait=True)
logger.debug(failture_requests)
logger.debug(len(failture_requests))

因为我的客户端使用线程池并发,只要 max_workers 大于 1 就会一直卡死

测试平台是 macos 13.3.1 (22E261) + python3.10.10,会卡死
但是我在 ubuntu20.04 + python3.11.3 上就是一切正常,不会出现卡死的问题

但是 max_workers 是 1 就没有问题,我怀疑问题出在服务端,但是具体原因我分析不出来

服务端如果用线程池,就一切正常,服务端和客户端都正常。
但是我只想用多进程

共有1个答案

拓拔曦
2023-05-04
import os
import socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing

default_encoding: str = 'utf-8'

def init_serversocket() -> socket.socket:
    serversocket = socket.socket(
        family=socket.AF_INET,
        type=socket.SOCK_STREAM
    )

    # 获取本地主机名
    host = socket.gethostname()

    logger.debug(f'host {host}')

    port = 6001

    # 绑定端口号
    serversocket.bind(('0.0.0.0', port))

    # 设置最大连接数,超过后排队
    serversocket.listen(2048)

    return serversocket

def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
    send_len: int = clientsocket.send(response_body)
    clientsocket.close()
    return send_len

def start_request(clientsocket_fd: int, addr: tuple) -> int:
    clientsocket = socket.fromfd(clientsocket_fd, socket.AF_INET, socket.SOCK_STREAM)
    os.close(clientsocket_fd)

    try:
        pid = os.getpid()
        logger.debug(f'pid: {pid}, get message from {addr}')
        request_body: bytes = clientsocket.recv(2048)
        request_text: str = request_body.decode(encoding=default_encoding)

        response_text: str = f'server get message: {request_text}'

        response_body: bytes = response_text.encode(default_encoding)
        # time.sleep(1)
        send_len = send_response(
            clientsocket=clientsocket, addr=addr, response_body=response_body)
        logger.debug(f'发送了响应')
        return send_len
    except Exception as error:
        logger.exception(error)
    finally:
        clientsocket.close()

def worker_process(clientsocket_fd, addr):
    start_request(clientsocket_fd, addr)

if __name__ == "__main__":
    serversocket = init_serversocket()

    pool = multiprocessing.Pool(processes=16)

    while True:
        try:
            clientsocket, addr = serversocket.accept()

            clientsocket_fd = clientsocket.fileno()
            pool.apply_async(worker_process, (clientsocket_fd, addr))

        except Exception as error:
            logger.exception(error)

    pool.close()
    pool.join()
 类似资料:
  • 我们正在使用SpringBoot在JAVA中开发基于grpc的服务。 我们正在跟进https://github.com/LogNet/grpc-spring-boot-starter @GrpcService:用于服务器端服务 @GrpcClient:用于客户端存根 我可以测试这个应用程序。 问题:在生产过程中,我们每秒将收到大约5000个请求,每个请求可能需要25毫秒到1秒。 客户端:如何实现连

  • 并发请求处理 我创建了一个服务器,并使用s.listenandserve()来处理请求。据我所知,这些请求是同时送达的。我使用一个简单的处理程序来检查它: 我看到,如果我发送了几个请求,我将看到所有的“1”出现,只有在一秒钟后所有的“2”出现。但是如果删除Hibernate行,我会看到程序在完成前一个请求之前从不启动请求(输出为1 2 1 2 1 2...)。所以我不明白,如果它们是并发的还是不是

  • 我知道PHP支持处理多个并发连接,并且根据服务器的不同,它可以像这个答案中提到的那样进行配置 服务器是如何管理多个连接的?它是为每个请求派生一个子进程,还是使用线程处理,还是使用线程池处理? 链接的答案说一个进程是分叉的,然后作者在评论中说是线程还是进程,这让人很困惑,如果请求是使用子进程、线程还是线程池提供的?

  • 我有一个简单的过程,需要处理一个表的记录,理想情况下运行多个流程实例,而不处理同一记录。我在MySQL中这样做的方式相当常见(尽管我认为令牌字段更像是一种黑客行为): 向表中添加几个字段: 然后是一个简单的处理脚本: 我正在使用PostgreSQL数据库的系统中实现这样的过程。我知道Pg在锁定方面可以被认为是比MySQL更成熟的,这要归功于MVCC - 我可以在Pg中使用行锁定或其他一些功能而不是

  • 客户端通过HTTP请求(通过浏览器post)调用Servlet,然后Servlet应向外部网站发送请求(get),并从网站接收响应(post)。servlet继续响应并向客户端发送响应(post)。 我的问题是如何在Servlet中发送和接收请求/响应并将某些内容发送回客户端?

  • 在使用了RESTful服务的场景下,非浏览器的客户端也可以直接提交多路文件请求。上一节讲述的所有例子与配置在这里也都同样适用。但与浏览器不同的是,提交的文件和简单的表单字段,客户端发送的数据可以更加复杂,数据可以指定为某种特定的内容类型(content type)——比如,一个多路上传请求可能第一部分是个文件,而第二部分是个JSON格式的数据: POST /someUrl Cont