当前位置: 首页 > 知识库问答 >
问题:

数据报套接字服务器不接收来自客户端的消息

长孙谦
2023-03-14

我有一个Python服务器使用unix数据报套接字连接与一个C客户端通信。下面的代码设置一个套接字,然后从客户端发送和接收一条消息。这个脚本在python 2.7中工作,但是,当在python 3中测试它时,对recv()的调用会超时等待来自客户端的消息。然而,客户端确实从服务器接收消息而没有问题。我已经用3.5.2和3.7.1在两台不同的机器上测试过了,结果相同。

更新:我添加了一个ioloop(asyncio事件循环的包装器)来创建一个回调系统,用于当套接字上的消息准备好时。当服务器使用Python 2.7运行时,会出现来自客户端的消息,但使用Python 3.7.1时,不会收到任何消息。

C客户资料来源:

// Socket Client

#include <stdio.h>
#include <stdlib.h>
#include <cerrno>
#include <vector>
#include <string>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cstdarg>

using namespace std;

int sock;
bool await_connection = true;
vector<sockaddr_un> dest_addrs;
int max_msg_size = 2048;
string msg_buf;
string sock_name = "test.sock";

void receive_control_message()
{
    char buf[4096]; // Should be enough for client->server messages
    sockaddr_un srcaddr;
    socklen_t srcaddr_len;

    srcaddr_len = sizeof(srcaddr);

    int len = recvfrom(sock, buf, sizeof(buf),
                       0, (sockaddr *) &srcaddr, &srcaddr_len);

    if (len == -1)
    {
        fprintf(stderr,"Socket read error: %s\n", strerror(errno));
        exit(-1);
    }

    printf("sockaddr_un sun_path: '%s'\n", srcaddr.sun_path);

    dest_addrs.push_back(srcaddr);

    string data(buf, len);

    fprintf(stderr, "websocket: Received control message in %d byte.\n", (int) data.size());
    printf("%s\n", data.c_str());
}

void finish_message()
{
    if (msg_buf.size() == 0)
        return;

    const int initial_buf_size = msg_buf.size();
    fprintf(stderr, "websocket: About to send %d bytes.\n", initial_buf_size);

    if (sock_name.empty())
    {
        msg_buf.clear();
        return;
    }

    msg_buf.append("\n");
    const char* fragment_start = msg_buf.data();
    const char* data_end = msg_buf.data() + msg_buf.size();
    int fragments = 0;
    while (fragment_start < data_end)
    {
        int fragment_size = data_end - fragment_start;
        if (fragment_size > max_msg_size)
            fragment_size = max_msg_size;
        fragments++;

        for (unsigned int i = 0; i < dest_addrs.size(); ++i)
        {
            int retries = 30;
            ssize_t sent = 0;
            while (sent < fragment_size)
            {
                ssize_t retval = sendto(sock, fragment_start + sent,
                    fragment_size - sent, 0, (sockaddr*) &dest_addrs[i],
                    sizeof(sockaddr_un));
                fprintf(stderr, "    trying to send fragment to client %d...\n", i);
                if (retval <= 0)
                {
                    const char *errmsg = retval == 0 ? "No bytes sent"
                                                     : strerror(errno);
                    if (--retries <= 0)
                    {
                        fprintf(stderr,"Socket write error: %s\n", errmsg );
                        exit(-1);
                    }

                    if (retval == 0 || errno == ENOBUFS || errno == EWOULDBLOCK
                        || errno == EINTR || errno == EAGAIN)
                    {
                        // Wait for half a second at first (up to five), then
                        // try again.
                        const int sleep_time = retries > 25 ? 2 * 1000
                                             : retries > 10 ? 500 * 1000
                                             : 5000 * 1000;
                        fprintf(stderr, "failed (%s), sleeping for %dms.\n",
                                                    errmsg, sleep_time / 1000);
                        usleep(sleep_time);
                    }
                    else if (errno == ECONNREFUSED || errno == ENOENT)
                    {
                        // the other side is dead
                        fprintf(stderr, "failed (%s), breaking.\n", errmsg);
                        dest_addrs.erase(dest_addrs.begin() + i);
                        i--;
                        break;
                    }
                    else
                    {
                        fprintf(stderr,"Socket write error: %s\n", errmsg);
                    }
                }
                else
                {
                    fprintf(stderr, "fragment size %d sent.\n", fragment_size);
                    sent += retval;
                }
            }
        }

        fragment_start += fragment_size;
    }
    msg_buf.clear();
    fprintf(stderr, "websocket: Sent %d bytes in %d fragments.\n", initial_buf_size, fragments);
}

void send_message(const char *format, ...)
{
    char buf[2048];
    int len;

    va_list argp;
    va_start(argp, format);
    if ((len = vsnprintf(buf, sizeof(buf), format, argp)) >= (int)sizeof(buf)
        || len == -1)
    {
        if (len == -1)
        {
            fprintf(stderr, "Webtiles message format error! (%s)", format);
            exit(-1);
        }
        else
        {
            fprintf(stderr, "Webtiles message too long! (%d)", len);
            exit(-1);
        }
    }
    va_end(argp);

    msg_buf.append(buf);

    finish_message();
}

int main()
{
    if (sock_name.empty()) return 0;

    sock = socket(PF_UNIX, SOCK_DGRAM, 0);
    if (sock < 0)
    {
        printf("Can't open the webtiles socket!\n");   
        exit(-1); 
    }

    sockaddr_un addr;
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, sock_name.c_str());
    if (::bind(sock, (sockaddr*) &addr, sizeof(sockaddr_un)))
    {
        printf("Can't bind the webtiles socket!\n");
        exit(-1); 
    }

    int bufsize = 64 * 1024;
    if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)))
    {
        printf("Can't set buffer size!\n");
        exit(-1); 
    }

    // Need small maximum message size to avoid crashes in OS X
    max_msg_size = 2048;

    struct timeval tv;
    tv.tv_sec = 1;
    tv.tv_usec = 0;
    if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0)
    {
        printf("Can't set send timeout!\n");
        exit(-1); 
    }    

    printf("Awaiting connection...\n");
    if (await_connection)
    {
        while (dest_addrs.size() == 0) receive_control_message();
    }

    send_message("{\"foo\":\"grapes\",\"boo\":\"chicken\"}");
    send_message("*{\"msg\":\"flush_messages\"}");

    return 0;
}

服务器脚本

# Socket Server

import socket
import json
from datetime import datetime, timedelta
import warnings
import os,sys
from tornado.ioloop import IOLoop

crawl_socket = None
crawl_socketpath = 'test.sock'
socketpath = 'crawl_socket'

msg_buffer = None
ioloop = IOLoop.instance()

def json_encode(value):
    return json.dumps(value).replace("</", "<\\/")

def close():
    global crawl_socket
    if crawl_socket:
        print ("Closing socket...")
        crawl_socket.close()
        os.unlink(socketpath)
        crawl_socket = None

def message_callback(data):
    if len(data) > 0 and not data.startswith("*"): 
        print(json.loads(data))
    elif data.startswith("*"): 
        print(json.loads(data[1:]))

def handle_data(data):
    global msg_buffer
    if msg_buffer is not None:
        data = msg_buffer + data
    if data[-1] != "\n":
        # All messages from crawl end with \n.
        # If this one doesn't, it's fragmented.
        msg_buffer = data
    else:
        msg_buffer = None
        message_callback(data)

def handle_read(fd, events):
    if events & ioloop.READ:
        data = crawl_socket.recv(128 * 1024, socket.MSG_DONTWAIT)
        handle_data(data)
    if events & ioloop.ERROR:
        pass

try:
    os.unlink(socketpath)
except OSError:
    if os.path.exists(socketpath):
        raise

if os.path.exists(crawl_socketpath) and not os.path.exists(socketpath):
    crawl_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    crawl_socket.settimeout(10)

    crawl_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    if (crawl_socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) < 2048):
        crawl_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

    if (crawl_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) < 212992):
        crawl_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 212992)

    msg = json_encode({ "msg": "attach", "primary": True })

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")

    crawl_socket.bind(socketpath)

    try:
        crawl_socket.sendto(msg.encode('utf-8'), crawl_socketpath)
    except socket.timeout:
        print("ERROR: in send_message() - Game socket send timeout")
        close()
        sys.exit(-1)

    ioloop.add_handler(crawl_socket.fileno(),
                        handle_read,
                        ioloop.ERROR | ioloop.READ)

    ioloop.start()
else:
    print('%s does not exist' % crawl_socketpath)

共有1个答案

鄢承运
2023-03-14

我自己设法解决了这个问题,所以这里有一个答案,供任何人在未来寻求对类似问题的见解。

Python 2中的socket.recv()返回一个str对象,表示接收的数据。

在Python3中,这一点已经发生了改变,而socket。recv()返回字节对象。

因此,需要添加以下内容来处理上述数据:

if isinstance(data,bytes):
    data = data.decode("utf-8") 
 类似资料:
  • 我有套接字服务器(java桌面应用程序)正在等待从java webapp(套接字客户端)连接。通信看起来还可以,我在客户端看到来自服务器的消息,但是当我发送消息时,我在服务器端没有收到任何消息。会有什么问题呢?当我检查服务器与telnet,一切正常。下面是我的代码: 服务器: 客户: 谢谢帮忙!

  • 我试图用java实现一个客户端服务器,在这里我读取客户端中的输入并在服务器中执行UperCase,然后返回客户端并打印UperCase。我使用ObjectOutputStream和ObjectInputStream进行读写,但是当我在客户机中键入一个msg时,程序会显示以下错误: Digite uma msg casa java.io.eofexception位于java.io.datainput

  • 我对Android系统是新手。我有一个客户端类,我的主要活动引用。client类将客户端套接字连接到充当服务器的外部设备,但是它从不将我试图发送的消息发送到服务器。我知道这不是连接,因为在创建套接字时,我将setKeepAlive()设置为true,当我试图发送消息时不会引发异常,socket.isconnected()返回true,如果我试图在发送消息之前连接套接字,它会引发“已经连接”的异常。

  • 我刚刚开始使用Sockets,对于我当前的项目,我需要能够从客户端控制我的程序,但是如果我的项目合作伙伴想同时使用他的客户端,服务器不会向他发送“您已连接”消息,如连接类所示。所以我假设服务器不同时接受多个客户端。我尝试过使用类Connection的Thread,但这也不会向第二个客户端发送消息“您已连接”。我在这里做错了什么? 这是我用来同时连接多个用户的线程: 编辑:附加信息 在第一个客户端连

  • 问题内容: 当我收到来自服务器的消息时,客户端是否接收到消息(如果在超时之前已断开连接)。我需要以某种方式从服务器端控制这种情况,我必须知道客户端是否能够在发送消息之前接收到消息。怎么做? 问题答案: 过去,我通常在单独的线程中使用阻塞读取来读取客户端消息并根据需要分发它们。 如果客户端断开连接,则InputStream.read()操作将返回-1,并且我们将其视为客户端已“断开连接”并且应关闭I

  • 服务器: 客户: 但最终,客户端阻塞了read(buffer)语句。