c++ builder 线程池

苏浩瀚
2023-12-01

//本线程池能够实际运行。

//线程池类比较长,放在后面

//1.新建窗体,在画面上放两个按钮,一个用于扔任务到现场池,另一个用于查看任务是否完成。

//2.添加线程池类,较长,放在后面。

//3.在窗体cpp文件加入头文件

   #include "visSimpleThreadPool.h"

//4.在窗体类cpp文件中加入

 class Work1;
 Work1 *pwork1;Work1 *pwork2;
class Work1:public TWork
{  public:

  void Exec()
  {   char str[100];
      AnsiString t=WorkName;
      for (int i = 0; i < 1000; i++)
      {   sprintf(str,"%s:%d\n",t.c_str(),i);
          OutputDebugStringA(str);
      }
  }
};

//5.在窗体的FormCreate()中加入

   pwork1=new Work1;pwork2=new Work1;
   pwork1->WorkName="WORK1";pwork2->WorkName="WORK2";
   g_pThreadWorkList = new TThreadWorkList(10);

//6.在窗体的FormClose()中加入

 if(g_pThreadWorkList)  delete g_pThreadWorkList;
       delete pwork1;   delete pwork2;

//7.在的第1个按钮中加入

      g_pThreadWorkList->AddWork(pwork1);
      g_pThreadWorkList->AddWork(pwork2);

//8.在的第2个按钮中加入

       ShowMessage(pwork1->WorkIsEnd());

//2. 包括2.1线程池类,以及2.2t emp.h头文件

//2.1线程池类

//----------------------------------------------------------------------------------------------------------------

//visSimpleThreadPool.h

//---------------------------------------------------------------------------
 #include "temp.h"
#ifndef visSimpleThreadPoolH
#define visSimpleThreadPoolH
//---------------------------------------------------------------------------
#include <Classes.hpp>
#include <vector>
#include <queue>
#include <SyncObjs.hpp>

//---------------------------------------------------------------------------
class TWork_Thread ;
class PACKAGE TWork
{
 public  :
 //--Tag开头的这几个变量留给用户使用。
  void *Tag;
  byte TagWait ;   //
  byte TagIndex ;   //
//----
  bool WorkEnd ;    //任务已经完成 , 未完成则为排程中
  bool IsWorking ;  //计算中

  unsigned int TimeStart;
  unsigned int TimeEnd ;

  String WorkMsg ;
  String WorkName ;
  TWork_Thread *Work_Thread ;
  void (__closure *pOnWorkEnd)(TWork * );
  void (__closure *pWorkExec)(TWork * );

  TWork();
  virtual ~TWork();
  virtual void Exec() ;
  bool WorkIsEnd();
  void SetWorkIsEnd();
};
//---------------------------------------------------------------------------
class TThreadWorkList ;
class PACKAGE TWork_Thread : public TThread
{
 friend class TThreadWorkList ;
private:
    bool running ;
protected:
    void __fastcall Execute();
     TThreadWorkList *ThreadWorkList ;
public:
    TWork *work ;
        String WorkName ;
    __fastcall TWork_Thread(bool CreateSuspended,TThreadWorkList *p);
};
//---------------------------------------------------------------------------
class PACKAGE TWorkList
{
  TCriticalSection *CS ;
  public :
  std::vector<TWork *> FWorks ;           //任务队列
  void AddWork(TWork *pWork);
  void ClearWorkList();

  TWorkList();
  ~TWorkList();
};
//---------------------------------------------------------------------------
class PACKAGE TThreadWorkList
{
  friend class TWork_Thread ;
  TCriticalSection *ThreadPoolCS ;
  TWork_Thread *FindFreeThread();
  void RunWork(TWork_Thread *pThread,TWork *pWork);
  std::queue<TWork *> FWorks ;           //任务队列
  std::vector<TWork_Thread *> FThreads ;  //线程池  bool AutoRun;

  public :

  TThreadWorkList();
  TThreadWorkList(int NThreads);
  ~TThreadWorkList();

  bool AllThreadsIsFree();
  int GetFreeThreadCount();
  void AddWork(TWork *pWork);
  int ClearWorkList();
};
//执行列表
extern PACKAGE TThreadWorkList *g_pThreadWorkList ;
//等待列表(未满足执行条件的待运行任务)
extern  PACKAGE TWorkList *G_WaitingWorkList ;
extern  PACKAGE bool G_StopAllThread ;
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
#endif

//----------------------------------------------------------------------------------------------------------------

//visSimpleThreadPool.cpp

// ---------------------------------------------------------------------------

#include <System.hpp>
#pragma hdrstop

#include "visSimpleThreadPool.h"

#pragma package(smart_init)
// ---------------------------------------------------------------------------


// ---------------------------------------------------------------------------
TThreadWorkList *g_pThreadWorkList = NULL;
TWorkList * G_WaitingWorkList     = NULL;
bool G_StopAllThread              = false;

__fastcall TWork_Thread::TWork_Thread(bool CreateSuspended, TThreadWorkList *p) :
    TThread(CreateSuspended), work(NULL), ThreadWorkList(p), running(false)
{
}

// ---------------------------------------------------------------------------
void TWork::Exec()
{
    if (pWorkExec)
        pWorkExec(this);
};
// ---------------------------------------------------------------------------
void __fastcall TWork_Thread::Execute()
{
    while (!Terminated && !G_StopAllThread)
    {
        if (work)
        {
            running           = true;
            work->IsWorking   = true;
            work->Work_Thread = this;
            work->TimeStart   = ::GetTickCount();
            work->TimeEnd     = 0;
            this->WorkName    = work->WorkName;
            try
            {
                work->Exec();
            }
            catch(Exception &ee)
            {
                ThreadPostError(WorkName + "线程任务【异常】:" + ee.Message);
            }
            catch(...)
            {
               ThreadPostError(WorkName+"线程任务发生未知异常");
            }

            {
                work->TimeEnd     = ::GetTickCount();
                work->WorkEnd     = true;
                work->Work_Thread = NULL;
                work->IsWorking   = false;
                this->WorkName    = "";
                if (work->pOnWorkEnd)
                    work->pOnWorkEnd(work);
                work = NULL;
            }
            if (ThreadWorkList && !ThreadWorkList->FWorks.empty())
            {
                bool WorkContinue = false;
                ThreadWorkList->ThreadPoolCS->Acquire();
                if (!ThreadWorkList->FWorks.empty())
                {
                    work = ThreadWorkList->FWorks.front();
                    ThreadWorkList->FWorks.pop();
                    WorkContinue = true;
                }
                ThreadWorkList->ThreadPoolCS->Release();
                if (WorkContinue)
                    continue;
            }

        }

        running = false;
        if (Terminated || G_StopAllThread)
            break;
        else
            this->Suspend();
    }
}

// ---------------------------------------------------------------------------
// TThreadWorkList
// ---------------------------------------------------------------------------
TThreadWorkList::TThreadWorkList() : FWorks(), FThreads()
{
    ThreadPoolCS = new TCriticalSection;
}

TThreadWorkList::TThreadWorkList(int NThreads) : FWorks()
{
    ThreadPoolCS = new TCriticalSection;
    FThreads.resize(NThreads);
    for (int i = 0; i < NThreads; ++i)
    {
        FThreads[i]                  = new TWork_Thread(true, this);
        FThreads[i]->FreeOnTerminate = false;
    }
}

// ---------------------------------------------------------------------------
TThreadWorkList::~TThreadWorkList()
{
    try
    {
        for(size_t i = 0; i < FThreads.size(); ++i)
        {
            if(FThreads[i]->Suspended)
            {
                FThreads[i]->work = NULL;
                FThreads[i]->Terminate();
                FThreads[i]->Resume();
                // FThreads[i]->Terminate();
            }
            else
            {
                FThreads[i]->Terminate();
            }
        }
    }
    catch(...)
    {
    }
    Sleep(50);
    try
    {
        for(size_t i = 0; i < FThreads.size(); ++i)
        {
            if(FThreads[i]->Suspended)
            {
                FThreads[i]->Resume();
                Sleep(50);
            }
            else
            {
                Sleep(1);
            }
            if(FThreads[i]->running && !ThreadIsEnd(FThreads[i]))
            {
                if(FThreads[i]->work)
                {
                    // && FThreads[i]->WorkName != ""
                    static AnsiString tmp = "强行终止未完成的作业:" + FThreads[i]->WorkName;
                    ThreadPostError(tmp.c_str());
                    Sleep(10);
                    if(FThreads[i]->running && !ThreadIsEnd(FThreads[i]))
                    {
                        try
                        {
                            // 可能造成无响应。堆没被释放。
                            void *p = (void *)FThreads[i]->Handle;
                            TerminateThread(p, 1);
                            Sleep(10);
                            CloseHandle(p);
                        }
                        catch(...)
                        {}
                    }
                    else
                    {
                        delete FThreads[i];
                    }
                    continue;
                }
                //
            }
            delete FThreads[i];
        }
    }
    catch(...)
    {
    }
    FThreads.clear();
    while(!FWorks.empty())
    {
        FWorks.pop();
    }
    delete ThreadPoolCS;
}
// ---------------------------------------------------------------------------

TWork_Thread *TThreadWorkList::FindFreeThread()
{
    for (size_t i = 0; i < FThreads.size(); ++i)
    {
        if (FThreads[i]->Suspended && FThreads[i]->work == NULL && !FThreads[i]->Terminated)
            return FThreads[i];
    }
    return NULL;
}

// ---------------------------------------------------------------------------
int TThreadWorkList::GetFreeThreadCount()
{
    int FreeCount = 0 ;
    for (size_t i = 0; i < FThreads.size(); ++i)
    {
        if (FThreads[i]->Suspended && FThreads[i]->work == NULL && !FThreads[i]->Terminated)
            FreeCount++;
    }
    return FreeCount;

}
// ---------------------------------------------------------------------------
int TThreadWorkList::ClearWorkList()
{
    ThreadPoolCS->Acquire();
    while (!FWorks.empty())
        FWorks.pop();
    ThreadPoolCS->Release();
    return GetFreeThreadCount();
}

// ---------------------------------------------------------------------------
bool TThreadWorkList::AllThreadsIsFree()
{
    for (size_t i = 0; i < FThreads.size(); ++i)
    {
        if ((!FThreads[i]->Suspended)) // || FThreads[i]->work)
                return false;
    }
    return true;
}

// ---------------------------------------------------------------------------
void TWorkList::AddWork(TWork *pWork)
{
    CS->Acquire();
    int n =  FWorks.size() ;
    if(n > 6)
    {
       ThreadPostInfo(0,"线程池滞留任务数:"+AnsiString(n));
    }
    pWork->TimeStart = 0 ;
    pWork->TimeEnd   = 0 ;
    pWork->WorkEnd = false ;
    pWork->IsWorking = false;
    FWorks.push_back(pWork);
    CS->Release();
}

// ---------------------------------------------------------------------------
void TThreadWorkList::AddWork(TWork *pWork)
{
    if (pWork)
    {
        pWork->WorkEnd   = false;
        pWork->IsWorking = false;
        pWork->TimeStart = 0;
        pWork->TimeEnd   = 0;

        ThreadPoolCS->Acquire();
        TWork_Thread *pThread = TThreadWorkList::FindFreeThread();
        if (pThread)
            RunWork(pThread, pWork);
        else
            FWorks.push(pWork); // 放到后面去。
        ThreadPoolCS->Release();
        int n = GetFreeThreadCount();
        if(n < 1)
        {
           ThreadPostInfo(0,"线程池暂无可用线程供分配:"+AnsiString(n)+"//"+AnsiString(FThreads.size()));
        }
    }
}

 // ---------------------------------------------------------------------------

TWorkList::TWorkList()
{
    CS = new TCriticalSection ;
}
TWorkList::~TWorkList()
{
    delete CS ;
}

void TThreadWorkList::RunWork(TWork_Thread *pThread,TWork *pWork)
  {
    pThread->work = pWork;
    pThread->Resume();
    pWork->IsWorking = true;
  }

  TWork::TWork() :  WorkEnd(true) , IsWorking(false),pOnWorkEnd(NULL),TagWait(0),Work_Thread(NULL)
             ,pWorkExec(NULL),TimeStart(0),TimeEnd(0) {}
  TWork::~TWork(){}
  bool TWork::WorkIsEnd(){return  WorkEnd && (!IsWorking)  ;}//  && (TimeStart > 0) && (TimeEnd > 0)
  void TWork::SetWorkIsEnd()
  {
     WorkEnd = true ;
     IsWorking = false ;
  }

//2.2 temp.h,

bool ThreadIsEnd(TThread      *Thread)
{
    // 增加判断线程是否退出条件!
    DWORD dwExitCode = 0;
    if(Thread != NULL)
    {
        ::GetExitCodeThread( (HANDLE)(Thread->Handle), &dwExitCode);
    }

    return Thread == NULL  || dwExitCode != STILL_ACTIVE;
}
void ThreadPostError(AnsiString temp)//这个函数你可以自己改
{
    OutputDebugStringA(temp.c_str());
}
void ThreadPostInfo(int n,AnsiString temp1)//这个函数你也可以自己改
{   AnsiString temp=n;
    temp=temp+temp1;
    OutputDebugStringA(temp.c_str());
}

 类似资料: