当前位置: 首页 > 知识库问答 >
问题:

c语言中多线程服务器的设计

百里秋月
2023-03-14

当尝试在linux上实现具有并发支持的简单回显服务器时。

使用以下方法

  • 使用pthread函数创建线程池,并将其保存在链表中。它在进程启动时创建,在进程终止时销毁

这个程序现在似乎起作用了。

问题是:

  • 是否适合在中间使用消息队列,它是否足够有效

共有2个答案

梁俊智
2023-03-14

除了使用pthread_mutex,您还需要使用pthread_cond_t(p线程条件),这将允许您在线程池中的线程实际上没有工作时将它们置于睡眠状态。否则,如果它们坐在那里循环检查工作队列中的某些内容,您将浪费计算周期。

我肯定会考虑使用C而不仅仅是纯C。我建议使用C的原因是,在C中,您可以使用模板。使用纯虚拟基类(让我们称之为:“vtask”),可以创建模板化的派生类,这些派生类接受参数,并在调用重载运算符()时插入参数,从而在任务中实现更多功能:

//============================================================================//

void* thread_pool::execute_thread()
{
    vtask* task = NULL;
    while(true)
    {
        //--------------------------------------------------------------------//
        // Try to pick a task
        m_task_lock.lock();
        //--------------------------------------------------------------------//

        // We need to put condition.wait() in a loop for two reasons:
        // 1. There can be spurious wake-ups (due to signal/ENITR)
        // 2. When mutex is released for waiting, another thread can be waken up
        //    from a signal/broadcast and that thread can mess up the condition.
        //    So when the current thread wakes up the condition may no longer be
        //    actually true!
        while ((m_pool_state != state::STOPPED) && (m_main_tasks.empty()))
        {
            // Wait until there is a task in the queue
            // Unlock mutex while wait, then lock it back when signaled
            m_task_cond.wait(m_task_lock.base_mutex_ptr());
        }

        // If the thread was waked to notify process shutdown, return from here
        if (m_pool_state == state::STOPPED)
        {
            //m_has_exited.
            m_task_lock.unlock();
            //----------------------------------------------------------------//
            if(mad::details::allocator_list_tl::get_allocator_list_if_exists() &&
               tids.find(CORETHREADSELF()) != tids.end())
                mad::details::allocator_list_tl::get_allocator_list()
                        ->Destroy(tids.find(CORETHREADSELF())->second, 1);
            //----------------------------------------------------------------//

            CORETHREADEXIT(NULL);
        }

        task = m_main_tasks.front();
        m_main_tasks.pop_front();
        //--------------------------------------------------------------------//
        //run(task);
        // Unlock
        m_task_lock.unlock();
        //--------------------------------------------------------------------//

        // execute the task
        run(task);

        m_task_count -= 1;
        m_join_lock.lock();
        m_join_cond.signal();
        m_join_lock.unlock();

        //--------------------------------------------------------------------//
    }
    return NULL;
}

//============================================================================//

int thread_pool::add_task(vtask* task)
{
#ifndef ENABLE_THREADING
    run(task);
    return 0;
#endif

    if(!is_alive_flag)
    {
        run(task);
        return 0;
    }

    // do outside of lock because is thread-safe and needs to be updated as
    // soon as possible
    m_task_count += 1;

    m_task_lock.lock();

    // if the thread pool hasn't been initialize, initialize it
    if(m_pool_state == state::NONINIT)
        initialize_threadpool();

    // TODO: put a limit on how many tasks can be added at most
    m_main_tasks.push_back(task);

    // wake up one thread that is waiting for a task to be available
    m_task_cond.signal();

    m_task_lock.unlock();

    return 0;
}

//============================================================================//

void thread_pool::run(vtask*& task)
{
    (*task)();

    if(task->force_delete())
    {
        delete task;
        task = 0;
    } else {
        if(task->get() && !task->is_stored_elsewhere())
            save_task(task);
        else if(!task->is_stored_elsewhere())
        {
            delete task;
            task = 0;
        }
    }
}

在上面,每个创建的线程都会运行execute\u thread(),直到m\u pool\u state设置为state::STOPPED。锁定m\u task\u锁,如果状态未停止且列表为空,则将m\u task\u锁传递给您的条件,从而使线程进入睡眠状态并释放锁。创建任务(未显示),添加任务(顺便说一下,m\u task\u count是一个原子,这就是为什么它是线程安全的)。在add任务期间,该条件被通知唤醒一个线程,从该线程开始执行m\u task\u cond。获取并锁定m\u task\u lock后,执行线程()的wait(m\u task\u lock.base\u mutex\u ptr())部分。

注意:这是一个高度定制的实现,它将大多数pthread函数/对象封装到C类中,因此复制和粘贴无论如何都不起作用。。。很抱歉和w.r.t.线程池::run(),除非您担心返回值,否则(*task)()行就是您所需要的。

我希望这能有所帮助。

编辑:m\u join\u*引用用于检查是否已完成所有任务。主线程处于类似的条件等待中,检查所有任务是否已完成,因为这对于我在中使用此实现的应用程序来说是必要的,然后再继续。

漆雕嘉平
2023-03-14

这对我来说似乎太复杂了。多线程服务器的常用方法是:

  • 在线程进程中创建侦听套接字
  • 在线程中接受客户端连接
  • 对于每个接受的客户端连接,创建一个新线程,该线程接收相应的文件描述符并执行工作
  • 工作线程在完全处理客户端连接时关闭客户端连接

我看不到在这里预填充线程池有什么好处。

如果确实需要线程池:

我只会使用一个链接列表来接受连接,并使用pthread\u互斥体来同步对它的访问:

  • 侦听器进程将客户端FD排在列表末尾
  • 客户排在最前面

如果队列为空,线程可以等待变量(pthread\u cond\u wait),并在连接可用时由侦听器进程(pthread\u cond\u signal)通知。

另一种选择

根据处理请求的复杂性,可以选择将服务器设为单线程,即在一个线程中处理所有连接。这完全消除了上下文切换,因此性能非常好。

一个缺点是,只使用一个CPU内核。为了改进这一点,可以使用混合模型:

  • 每个核心创建一个工作线程

然而,您必须实施各种机制,以便在工人之间公平分配工作。

 类似资料:
  • 本文向大家介绍linux下c语言的多线程编程,包括了linux下c语言的多线程编程的使用技巧和注意事项,需要的朋友参考一下 我们在写linux的服务的时候,经常会用到linux的多线程技术以提高程序性能  多线程的一些小知识: 一个应用程序可以启动若干个线程。 线程(Lightweight Process,LWP),是程序执行的最小单元。 一般一个最简单的程序最少会有一个线程,就是程序本身,也就是

  • 问题内容: 我正在尝试在python中创建多线程Web服务器,但是它一次只响应一个请求,我不知道为什么。你能帮我吗? 问题答案: 在Doug Hellmann的博客中查看此帖子。

  • 问题内容: 我使用python的threding模块创建了一个简单的多线程tcp服务器。每次连接新客户端时,该服务器都会创建一个新线程。 然后,我打开了两个新的终端,并使用netcat连接到服务器。然后,当我使用连接的第一个终端输入并向服务器发送第一个数据时,来自服务器的答复将传到另一个终端,并且第一个连接断开。我猜到了原因,但我怀疑是否会发生这种情况,因为 clientsock 变量被覆盖,因此

  • 我是个新手,如果你能给我建议的话,请告诉我。我有一个向客户端广播消息的服务器。然后客户端将回复发送回服务器。我想用单独的线程处理每个回复。每个回复都有mesage id和thread id。我如何用来自所有线程的信息填充一些结构,然后读取它 也从我的代码,它是正确地创建线程,而还是它存在某种方式来创建线程,只是如果我得到客户端的回复? 我是从正确的理解开始的吗? 非常感谢。

  • 对于一个开发者而言,能够胜任系统中任意一个模块的开发是其核心价值的体现。 对于一个架构师而言,掌握各种语言的优势并可以运用到系统中,由此简化系统的开发,是其架构生涯的第一步。 对于一个开发团队而言,能在短期内开发出用户满意的软件系统是起核心竞争力的体现。 每一个程序员都不能固步自封,要多接触新的行业,新的技术领域,突破自我。

  • 服务器 用于监听服务器中每个客户机的线程在名为OyenteCliente(ClientListener)的类中实现,每个客户机中监听服务器petitios的线程在OyenteServidor(ServerListener)中实现。 客户监听器 非常感谢!