当前位置: 首页 > 知识库问答 >
问题:

Delphi TMonitor。等待多线程问题

阮桐
2023-03-14

我们在后端服务中遇到了这个多线程问题:

在具有30个线程的多线程Windows服务应用程序中,SysUtils出现问题。事件缓存出现。问题是NewWaitObj函数有时返回NIL而不是Event对象。此函数用于TMonitor sync methods Wait。t监视器。Wait在事件对象为零时停止工作。这会影响许多VCL和RTL线程同步源代码,并在多线程应用程序中导致不同的端问题,例如TThreadedQueue。PopItem不等待新项目到达队列,并立即返回超时结果。

NewWaitObj函数出现问题:

function NewWaitObj: Pointer;
var
  EventItem: PEventItemHolder;
begin
  EventItem := Pop(EventCache);
  if EventItem <> nil then
  begin
    Result := EventItem.Event;
    EventItem.Event := nil;
    Push(EventItemHolders, EventItem);
  end else
    Result := NewSyncWaitObj;
  ResetSyncWaitObj(Result);
end;

看起来Pop函数在繁重的多线程应用程序中没有得到很好的保护,并且在一些并发线程中,它开始将一个相同的EventItem实例返回给两个(或多个)线程。然后在NewWaitObj中发生竞争条件:

  1. 一个线程接受EventItem。事件并将其作为结果返回,然后将其归零为零,赛车并行线程将获得相同的EventItem。事件,但它已被第一个线程清除
  2. 这会导致一个竞速线程返回有效的事件句柄,而另一个竞速线程返回NIL
  3. t监视器。Wait函数不起作用,因为它的事件句柄为NIL
  4. TThreadedQueue。PopItem不等待,其他同步方法也无法正常工作

由于某些原因,当应用程序有多个并发线程时,Pop方法中的线程同步不起作用:

function Pop(var Stack: PEventItemHolder): PEventItemHolder;
begin
  repeat
    Result := Stack;
    if Result = nil then
      Exit;
  until AtomicCmpExchange(Pointer(Stack), Result.Next, Result) = Result;
end;

在60个测试线程的测试应用程序中,问题大约在10-20秒内出现,30个线程更难发生,通常需要5-10分钟。一旦出现问题,它永远不会停止,直到重新启动应用程序。在测试应用程序中,线程同步被破坏后,EventCache的每5个操作中大约有一个返回NIL。看起来AtomicCmpExchange中出现了一些问题,我检查了生成的代码-它只是一条CMPXCHG指令,很少有其他指令用于设置寄存器。我不太确定是什么原因导致了这个问题——例如,一个线程在设置寄存器以调用CMPXCHG时,或者在调用之后处理结果时,是否可以从其他线程获得干预?

试图了解问题的原因,以便找到最佳解决方法。现在,我计划用我自己的NewWaitObj替换原来的NewWaitObj,它只调用原来的版本,直到它返回有效的对象为止。这个问题在我们的开发、测试和生产环境中经常发生,对于生产服务器上的真实中间件服务,需要几个小时(有时几天)才能出现问题,之后只需重新启动即可修复问题。测试应用程序可从Embarcadero JIRA的发行版下载https://quality.embarcadero.com/browse/RSP-31154

编辑:TestApp:https://quality.embarcadero.com/secure/attachment/31605/EventCacheBug.zip

Delphi源代码示例:

unit FormMainEventCacheBugU;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Math, Vcl.StdCtrls;

const
   MaxProducers = 60;

type
  TFormEventCacheBug = class(TForm)
    BtnMaxProducers: TButton;
    BtnRemoveProducer: TButton;
    BtnAddProducer: TButton;
    procedure BtnMaxProducersClick(Sender: TObject);
    procedure BtnRemoveProducerClick(Sender: TObject);
    procedure BtnAddProducerClick(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;

   TEventEater = class(TThread)
   private
      SleepTime: Integer;
      SMsg, EMsg, NMsg: PChar;
      procedure EatEvent;
   protected
      procedure Execute; override;
   public
      constructor Create;
   end;

var
  FormEventCacheBug: TFormEventCacheBug;
  Producers: array[1..MaxProducers] of TThread;
  ProdCount: Integer;

implementation

{$R *.dfm}

procedure AddProducer;
begin
   if ProdCount < MaxProducers then
   begin
      Inc(ProdCount);
      Producers[ProdCount] := TEventEater.Create;
      Producers[ProdCount].FreeOnTerminate := True;
   end;
end;

procedure RemoveProducer;
begin
   if ProdCount > 0 then
   begin
      Producers[ProdCount].Terminate;
      Dec(ProdCount);
   end;
end;

{ TEventEater }

constructor TEventEater.Create;
begin
   inherited Create(False);
   SleepTime := RandomRange(1, 3);
end;

procedure TEventEater.EatEvent;
var
   EventHandle: Pointer;
begin
   //OutputDebugString(SMsg);
   EventHandle := System.MonitorSupport.NewWaitObject;
   try
      if EventHandle = nil then
         OutputDebugString('NIL');
      Sleep(SleepTime);
   finally
      if EventHandle <> nil then
         System.MonitorSupport.FreeWaitObject(EventHandle);
//      OutputDebugString(EMsg);
   end;
end;

procedure TEventEater.Execute;
begin
   SMsg := PChar('S:' + GetCurrentThreadId.ToString);
   EMsg := PChar('E:' + GetCurrentThreadId.ToString);
   NMsg := PChar('NIL:' + GetCurrentThreadId.ToString);
   while not Terminated do
   begin
      EatEvent;
      Sleep(SleepTime);
   end;
end;

procedure TFormEventCacheBug.BtnAddProducerClick(Sender: TObject);
begin
   AddProducer;
end;

procedure TFormEventCacheBug.BtnRemoveProducerClick(Sender: TObject);
begin
   RemoveProducer;
end;

procedure TFormEventCacheBug.BtnMaxProducersClick(Sender: TObject);
var
   i: Integer;
begin
   for i := ProdCount + 1 to MaxProducers do
      AddProducer;
end;

end.

感谢任何想法,

共有1个答案

尉迟国发
2023-03-14

@miroslavPenchev,谢谢你的帖子!在XE2中工作,也有类似的问题。Delphi 10.4.1使用带有计数器和128位比较交换的链表头解决了TMonitor ABA问题。不幸的是,对于XE2来说,这不是一个简单的选项。

再次感谢您的建议,覆盖一些调用原始方法的MonitorSupport方法。

下面是我正在使用的解决方案。它并不是百分之百完美的,因为它涉及到锁定,但对于并发性较差的环境,它至少使系统稳定,并且没有百分之百的CPU问题。

var
  MonitorSupportFix: TMonitorSupport;
  OldMonitorSupport: PMonitorSupport;
  NewWaitObjCS: TCriticalSection;

function NewWaitObjFix: Pointer;
begin
  if Assigned(NewWaitObjCS) then
    NewWaitObjCS.Enter;
  try
    Result := OldMonitorSupport.NewWaitObject;
  finally
    if Assigned(NewWaitObjCS) then
      NewWaitObjCS.Leave;
  end;
end;

procedure FreeWaitObjFix(WaitObject: Pointer);
begin
  if Assigned(NewWaitObjCS) then
    NewWaitObjCS.Enter;
  try
    OldMonitorSupport.FreeWaitObject(WaitObject);
  finally
    if Assigned(NewWaitObjCS) then
      NewWaitObjCS.Leave;
  end;
end;

procedure InitMonitorSupportFix;
begin
  OldMonitorSupport := System.MonitorSupport;
  MonitorSupportFix := OldMonitorSupport^;
  MonitorSupportFix.NewWaitObject := NewWaitObjFix;
  MonitorSupportFix.FreeWaitObject := FreeWaitObjFix;

  System.MonitorSupport := @MonitorSupportFix;
end;

initialization
  NewWaitObjCS := TCriticalSection.Create;
  InitMonitorSupportFix;
finalization
  FreeAndNil(NewWaitObjCS);
end.
 类似资料:
  • 问题内容: 我正在为我的ubuntu服务器(针对我的多客户端匿名聊天程序)实现一种简单的线程池机制,并且需要使我的工作线程进入睡眠状态,直到需要执行一项工作(以函数指针和参数的形式) 。 我当前的系统即将关闭。我(工人线程正在)问经理是否有工作可用,以及是否有5毫秒没有睡眠。如果存在,请将作业添加到工作队列中并运行该函数。糟糕的循环浪费。 什么我 喜欢 做的是做一个简单的事件性的系统。我正在考虑有

  • 这可能是在类似的背景下问的,但我在搜索了大约20分钟后找不到答案,所以我会问。 我已经编写了一个Python脚本(比如说:scriptA.py)和一个脚本(比如说scriptB.py) 在scriptB中,我想用不同的参数多次调用scriptA,每次运行大约需要一个小时,(这是一个巨大的脚本,做了很多事情……不用担心),我希望能够同时使用所有不同的参数运行scriptA,但我需要等到所有参数都完成

  • 问题内容: 我有以下情况: 为了运行算法,我必须运行多个线程,并且每个线程都会在死之前设置一个实例变量x。问题是这些线程不会立即返回: 我应该使用等待通知吗?还是我应该嵌入一个while循环并检查是否终止? 感谢大家! 问题答案: 创建一些共享存储来保存每个线程的值,或者如果足够的话,只存储总和。使用a 等待线程终止。每个线程完成后都会调用,您的方法将使用该方法来等待它们。 编辑: 这是我建议的方

  • 问题内容: 在我的程序执行过程中,启动了多个线程。线程数量取决于用户定义的设置,但是它们都使用不同的变量执行相同的方法。 在某些情况下,需要在执行过程中进行清理,其中一部分是停止所有线程,尽管我不希望它们立即停止,我只是设置了一个变量来检查它们是否终止。问题在于线程停止之前最多可能需要1/2秒。但是,我需要确保所有线程都已停止,然后才能继续进行清理。清理是从另一个线程执行的,因此从技术上讲,我需要

  • 问题内容: 有什么方法可以简单地等待所有线程处理完成?例如,假设我有: 如何更改此方法,以便该方法在注释处暂停直到所有线程的方法退出?谢谢! 问题答案: 你将所有线程放入数组中,全部启动,然后进行循环 每个连接将阻塞,直到相应的线程完成为止。线程的完成顺序可能不同于你加入线程的顺序,但这不是问题:退出循环时,所有线程均已完成。

  • 我们在其中一个模块中使用了Hystrix-断路器模式[library]。usecase是:-我们正在从kafka轮询16个消息,并使用pararllel流处理它们,因此,对于工作流中的每条消息,它需要3个rest调用,这些调用由hystric命令保护。现在,问题是当我尝试运行单个实例时,CPU显示尖峰,线程转储显示许多线程处于等待状态,等待所有3个命令。如下所示:-