当前位置: 首页 > 知识库问答 >
问题:

我可以在多线程I/O服务超时的情况下使用boost.asio同步读取套接字吗?

端木明贤
2023-03-14

我有一个应用程序使用boost.asio进行TCP和UDP套接字通信。我知道“asio”中的“a”代表异步,因此库倾向于鼓励您在可能的情况下使用异步I/O。我有几个情况下,同步套接字读取是可取的。然而,同时,我想设置一个超时的所述接收调用,这样就不会有可能读阻塞无限期。

这在boost.asio用户中似乎是一个相当常见的问题,在这个主题中有以下过去的堆栈溢出问题:

    null
std::size_t receive(const boost::asio::mutable_buffer& buffer,
      boost::posix_time::time_duration timeout, boost::system::error_code& ec)
  {
    // Set a deadline for the asynchronous operation.
    deadline_.expires_from_now(timeout);

    // Set up the variables that receive the result of the asynchronous
    // operation. The error code is set to would_block to signal that the
    // operation is incomplete. Asio guarantees that its asynchronous
    // operations will never fail with would_block, so any other value in
    // ec indicates completion.
    ec = boost::asio::error::would_block;
    std::size_t length = 0;

    // Start the asynchronous operation itself. The handle_receive function
    // used as a callback will update the ec and length variables.
    socket_.async_receive(boost::asio::buffer(buffer),
        boost::bind(&client::handle_receive, _1, _2, &ec, &length));

    // Block until the asynchronous operation has completed.
    do io_service_.run_one(); while (ec == boost::asio::error::would_block);

    return length;
  }

但是,对于套接字的底层I/O服务已经在一个或多个后台线程中运行的情况呢?在这种情况下,不能保证异步操作的处理程序会由上面代码段中的前台线程运行,因此run_one()将不会返回,直到稍后一些可能不相关的处理程序执行。这将使套接字读取相当没有响应。

asio::io_service有一个poll_one()函数,它将检查服务的队列而不阻塞,但我看不出有什么好方法可以阻塞前台线程(模拟同步调用行为)直到处理程序执行,除非没有后台线程正在执行asio::io_service::run()

我看到了两个潜在的解决方案,我都不喜欢:

>

  • 在启动异步操作后,使用条件变量或类似构造来使前台线程阻塞。在async_receive()调用的处理程序中,用信号通知条件变量以解除对线程的阻塞。这会为每次读取引入一些锁定,这是我希望避免的,因为我希望在UDP套接字读取上实现最大可能的吞吐量。否则,它是可行的,并且可能是我会做的,除非有一个更好的方法出现。

    确保套接字有自己的asio::io_service,该套接字不由任何后台线程运行。这使得在需要异步I/O的情况下更难使用套接字。

    有没有其他方法来以安全的方式完成这件事的想法?

    size_t sync_recvfrom(socket_type s, state_type state, buf* bufs,
        size_t count, int flags, socket_addr_type* addr,
        std::size_t* addrlen, boost::system::error_code& ec)
    {
      if (s == invalid_socket)
      {
        ec = boost::asio::error::bad_descriptor;
        return 0;
      }
    
      // Read some data.
      for (;;)
      {
        // Try to complete the operation without blocking.
        signed_size_type bytes = socket_ops::recvfrom(
            s, bufs, count, flags, addr, addrlen, ec);
    
        // Check if operation succeeded.
        if (bytes >= 0)
          return bytes;
    
        // Operation failed.
        if ((state & user_set_non_blocking)
            || (ec != boost::asio::error::would_block
              && ec != boost::asio::error::try_again))
          return 0;
    
        // Wait for socket to become ready.
        if (socket_ops::poll_read(s, 0, ec) < 0)
          return 0;
      }
    }
    
    int poll_read(socket_type s, state_type state, boost::system::error_code& ec)
    {
      if (s == invalid_socket)
      {
        ec = boost::asio::error::bad_descriptor;
        return socket_error_retval;
      }
    
      pollfd fds;
      fds.fd = s;
      fds.events = POLLIN;
      fds.revents = 0;
      int timeout = (state & user_set_non_blocking) ? 0 : -1;
      clear_last_error();
      int result = error_wrapper(::poll(&fds, 1, timeout), ec);
      if (result == 0)
        ec = (state & user_set_non_blocking)
          ? boost::asio::error::would_block : boost::system::error_code();
      else if (result > 0)
        ec = boost::system::error_code();
      return result;
    }
    
  • 共有1个答案

    邢鸿博
    2023-03-14

    boost.asio对future的支持可能会提供一个优雅的解决方案。当为异步操作提供boost::asio::use_future值作为其完成处理程序时,发起函数将返回一个std::future对象,该对象将接收该操作的结果。此外,如果操作失败,则error_code将转换为system_error并通过future传递给调用方。

    在boost.asio C++11 Futures datytime客户端示例中,一个专用线程运行io_service,主线程启动异步操作,然后同步等待操作完成,如下所示:

    std::array<char, 128> recv_buf;
    udp::endpoint sender_endpoint;
    std::future<std::size_t> recv_length =
      socket.async_receive_from(
          boost::asio::buffer(recv_buf),
          sender_endpoint,
          boost::asio::use_future);
    
    // Do other things here while the receive completes.
    
    std::cout.write(
        recv_buf.data(),
        recv_length.get()); // Blocks until receive is complete.
    

    当使用futures时,实现带有超时的同步读取的总体方法与以前相同。不是使用同步读取,而是使用异步读取和异步等待计时器。唯一的小改动是,不是阻塞io_service或定期检查谓词,而是调用future::get()阻塞,直到操作成功或失败(如超时)为止。

    如果C++11不可用,那么可以为boost.thread的future定制异步操作的返回类型,如以下答案所示。

     类似资料:
    • 问题内容: 我正在使用Linux 3.2.0,x86_64。我可以同时从多个线程为一个套接字调用accept()吗? 问题答案: 是的,您可以从多个线程和多个进程调用同一个侦听套接字,尽管指向它的指向可能不如您想象的那么多。内核将只允许一个成功。当使用进程完成此操作时,这称为预分支,并且为每个新连接节省了a的开销。但是,当您处理线程时,可以更轻松地拥有一个等待新连接队列的现有线程池。一个线程执行并

    • 为了实现这一点,我使用了队列/线程池机制。最初,我创建一个固定数量线程的池,并有一个队列datastructure来存储客户机地址。这个队列在所有线程之间共享,因此我使用“互斥”来锁定/解锁这个队列。在主服务器线程中,我创建一个套接字,将其绑定到全局端口/地址,然后在“recvfrom”调用上阻止服务器。任何希望与服务器通信的客户端都会向侦听全局端口地址的主线程服务器发送“HI”消息。主服务器线程

    • 这是来自.properties得my DB配置: 这是config.xml: 实际上,我可以很容易地访问我们的本地web应用程序,并且可以在日志中看到到DB的连接跟踪,但对于批处理来说,情况并非如此。 在我有了这个之后: 有人帮忙吗?

    • 问题内容: 我正在使用API(,和,和朋友)对远程服务器进行Web服务调用,这在很大程度上取得了很大的成功。 但是,有时会出现问题,并且程序永远卡住。 为了解决这个问题,我想添加一个读取超时。 我发现有几种方法可以实现这一目标,但它们似乎都非常糟糕。 所以我对社区的问题是:使用javax.xml.soap API进行调用时,实现读取超时行为的最佳方法是什么? 问题答案: 您必须创建自己的URLSt

    • 问题内容: 我有以下情况。我的工作是: 在给定的时间后可能会超时,如果发生则需要抛出异常 如果没有超时,将返回结果 如果此作业返回结果,则必须尽快将其返回,因为性能非常重要。因此,异步解决方案已经不在市场上了,通过锤子自然捆绑系统也是一种选择。 最后,系统必须符合EJB标准,因此不建议使用普通线程的AFAIK,因为这是严格禁止的。 我们当前的解决方案使用一个线程,该线程在存在一定时间后将被抛出异常

    • 我有java服务器和只有一个客户端的时间。 客户端连接并发送卡ID(在服务器端阻止读取是合适的,因为当时只有一个客户端)