我在掌握如何正确处理从以多线程方式使用Boost Asio的多线程程序创建子进程方面遇到一些麻烦。
如果我理解正确,那么在Unix世界中启动子进程的方法是先调用,fork()
然后调用exec*()
。另外,如果我理解正确,则调用fork()
将复制所有文件描述符,依此类推,除非标记为,否则需要
在子进程中将其 关闭FD_CLOEXEC
(从而在调用时被原子关闭exec*()
)。
Boost
Asio需要在fork()
被调用时得到通知,以便通过调用正确运行notify_fork()
。但是,在多线程程序中,这会产生几个问题:
如果我理解正确的话,套接字默认是由子进程继承的。可以将它们设置为SOCK_CLOEXEC
-, 但不能直接在创建时设置 *,因此,如果正在从另一个线程创建子进程,则会导致出现计时窗口。
notify_fork()
要求没有其他线程调用 任何其他io_service
函数,也没有 与该.I关联的任何其他I / O对象上的任何函数io_service
。这似乎真的不可行-毕竟程序是多线程的,是有原因的。
如果我理解正确,则在fork()
和之间进行的任何函数调用都exec*()
必须是异步信号安全的(请参阅fork()
文档)。没有文档说明该notify_fork()
呼叫是异步信号安全的。实际上,如果我查看Boost Asio的源代码(至少在1.54版中),可能会有对pthread_mutex_lock的调用,如果我理解正确的话,这 不是 异步信号安全的(请参阅Signal Concepts,还有其他调用在进行中)不在白名单上)。
问题#1我可能可以通过将子进程和套接字+文件的创建分开来解决,以便确保在正在创建和设置的套接字之间的窗口中没有子进程被创建SOCK_CLOEXEC
。问题2比较棘手,我可能需要确保
所有 asio处理程序线程都已停止,执行fork,然后再次重新创建它们,这充其量是大事,而在最坏的情况下真的很糟糕(我的未决计时器是什么??
)。问题3似乎使得完全不可能正确使用它。
如何与fork()
+ 一起在多线程程序中正确使用Boost Asio exec*()
? …还是我“分叉”?
如果我误解了任何基本概念,请让我知道(我是在Windows编程上长大的,而不是* nix …)。
编辑:*-实际上,可以SOCK_CLOEXEC
直接在Linux上创建带有套接字的套接字,此套接字自2.6.27开始可用(请参阅socket()
文档)。在Windows上,WSA_FLAG_NO_HANDLE_INHERIT
从Windows
7 SP 1 / Windows Server 2008 R2 SP
1开始提供相应的标志(请参阅WSASocket()
文档)。OS X似乎不支持此功能。
在多线程程序中,io_service::notify_fork()
在子级中调用是不安全的。但是,Boost.Asio希望基于fork()
support来调用它,因为这是孩子关闭父级以前的内部文件描述符并创建新的文件描述符时。尽管Boost.Asio明确列出了调用的前提条件io_service::notify_fork()
,并在期间保证了其内部组件的状态,但fork()
对实现的简短了解表明std::vector::push_back()
可以从空闲存储区分配内存,并且不能保证分配是异步信号-
安全。
话虽如此,一种可能值得考虑的解决方案fork()
是单线程时的进程。子进程将保持单线程并执行,fork()
并且exec()
在父进程通过进程间通信告知执行该操作时。这种分离通过消除在执行fork()
和时管理多个线程的状态的需要,简化了问题exec()
。
这是一个演示此方法的完整示例,其中多线程服务器将通过UDP接收文件名,并且子进程将执行fork()
并在文件名exec()
上运行/usr/bin/touch
。为了使该示例更具可读性,我选择使用堆栈式协程。
#include <unistd.h> // execl, fork
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
/// @brief launcher receives a command from inter-process communication,
/// and will then fork, allowing the child process to return to
/// the caller.
class launcher
{
public:
launcher(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
std::string& command)
: io_service_(io_service),
socket_(socket),
command_(command)
{}
void operator()(boost::asio::yield_context yield)
{
std::vector<char> buffer;
while (command_.empty())
{
// Wait for server to write data.
std::cout << "launcher is waiting for data" << std::endl;
socket_.async_receive(boost::asio::null_buffers(), yield);
// Resize buffer and read all data.
buffer.resize(socket_.available());
socket_.receive(boost::asio::buffer(buffer));
io_service_.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service_.notify_fork(boost::asio::io_service::fork_child);
command_.assign(buffer.begin(), buffer.end());
}
else // parent
{
io_service_.notify_fork(boost::asio::io_service::fork_parent);
}
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& socket_;
std::string& command_;
};
using boost::asio::ip::udp;
/// @brief server reads filenames from UDP and then uses
/// inter-process communication to delegate forking and exec
/// to the child launcher process.
class server
{
public:
server(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
short port)
: io_service_(io_service),
launcher_socket_(socket),
socket_(boost::make_shared<udp::socket>(
boost::ref(io_service), udp::endpoint(udp::v4(), port)))
{}
void operator()(boost::asio::yield_context yield)
{
udp::endpoint sender_endpoint;
std::vector<char> buffer;
for (;;)
{
std::cout << "server is waiting for data" << std::endl;
// Wait for data to become available.
socket_->async_receive_from(boost::asio::null_buffers(),
sender_endpoint, yield);
// Resize buffer and read all data.
buffer.resize(socket_->available());
socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint);
std::cout << "server got data: ";
std::cout.write(&buffer[0], buffer.size());
std::cout << std::endl;
// Write filename to launcher.
launcher_socket_.async_send(boost::asio::buffer(buffer), yield);
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& launcher_socket_;
// To be used as a coroutine, server must be copyable, so make socket_
// copyable.
boost::shared_ptr<udp::socket> socket_;
};
int main(int argc, char* argv[])
{
std::string filename;
// Try/catch provides exception handling, but also allows for the lifetime
// of the io_service and its IO objects to be controlled.
try
{
if (argc != 2)
{
std::cerr << "Usage: <port>\n";
return 1;
}
boost::thread_group threads;
boost::asio::io_service io_service;
// Create two connected sockets for inter-process communication.
boost::asio::local::datagram_protocol::socket parent_socket(io_service);
boost::asio::local::datagram_protocol::socket child_socket(io_service);
boost::asio::local::connect_pair(parent_socket, child_socket);
io_service.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service.notify_fork(boost::asio::io_service::fork_child);
parent_socket.close();
boost::asio::spawn(io_service,
launcher(io_service, child_socket, filename));
}
else // parent
{
io_service.notify_fork(boost::asio::io_service::fork_parent);
child_socket.close();
boost::asio::spawn(io_service,
server(io_service, parent_socket, std::atoi(argv[1])));
// Spawn additional threads.
for (std::size_t i = 0; i < 3; ++i)
{
threads.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service));
}
}
io_service.run();
threads.join_all();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
// Now that the io_service and IO objects have been destroyed, all internal
// Boost.Asio file descriptors have been closed, so the execl should be
// in a clean state. If the filename has been set, then exec touch.
if (!filename.empty())
{
std::cout << "creating file: " << filename << std::endl;
execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0));
}
}
1号航站楼:
$ ls
a.out example.cpp
$ ./a.out 12345
服务器正在等待数据
启动器正在等待数据
服务器获取数据:
服务器正在等待数据
启动器正在等待数据
创建文件:
服务器获取数据:b
服务器正在等待数据
启动器正在等待数据
创建文件:b
服务器获取数据:c
服务器正在等待数据
启动器正在等待数据
创建文件:c
ctrl + c
$ ls
a.out bc example.cpp
2号航站楼:
$ nc -u 127.0.0.1 12345
a ctrl + db ctrl + dcctrl + d
问题内容: Java如何确定分配线程或进程的内核?有什么办法可以控制吗?防止两个大线程在同一内核上执行? 基本上,我要问的是有关Java中多线程如何工作或如何在Java中控制它的更多信息。 问题答案: 您不能为特定线程设置处理器关联。但是,如果将程序分为两个进程,则应该能够将这些进程分配给操作系统级别的特定处理器。 http://www.cyberciti.biz/tips/setting-pro
问题内容: 我正在尝试将Hibernate用于多线程应用程序,其中每个线程都检索一个对象并将其插入表中。我的代码如下所示。我每个线程都有本地hibernate会话对象,在每个InsertData中,我都执行beginTransaction和commit。 我面临的问题是很多次我收到“ org.hibernate.TransactionException:不支持嵌套事务” 由于我是hibernate
问题内容: 是否有一种实际的方法可以在PHP中实现多线程模型,无论是真正的还是仅对其进行仿真。一段时间以前,建议您可以强制操作系统加载PHP可执行文件的另一个实例并处理其他同时进行的进程。 这样做的问题是,当PHP代码完成执行PHP实例后,它仍保留在内存中,因为无法从PHP中杀死它。因此,如果您正在模拟多个线程,则可以想象会发生什么。因此,我仍在寻找一种可以在PHP中有效完成或模拟多线程的方法。有
关于Spring WebClient我有一个问题 在我的应用程序中,我需要做许多类似的API调用,有时我需要更改调用中的头(身份验证令牌)。所以问题来了,在这两个选择中,什么更好: > 为所有传入的MyService.class请求创建一个WebClient,方法是将其设置为字段,如下代码所示: 谢谢你。
根据http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/overview/cpp2011/futures.html,我们可以将boost::asio与一起使用。但是我找不到任何有关使用的信息,它具有更多的功能,例如。我怎么用?
所以我正在编写代码,它将解析文件夹中的多个文本文件,收集它们的信息,并将这些信息保存在两个静态列表实例变量中。信息存放的顺序并不重要,因为我最终将对其进行排序。但由于某些原因,增加线程数不会影响速度。这是我的run方法和主方法中利用多线程的部分。 如果需要额外的信息,我基本上有一个静态实例变量作为我需要浏览的文件的数组,还有一个常量是线程数(为了测试目的手动更改)。如果我有4个线程和8个文件,每个