当前位置: 首页 > 知识库问答 >
问题:

计算工作线程在处理消息时所用的时间

丁曦哲
2023-03-14
    null

当工作线程处理消息时,如果处理时间超过5分钟,我希望生成一条警告消息,但仍然让工作线程继续处理。

问题

我希望不断检查工作线程是否超过了5分钟的消息处理时间,如果超过了阈值时间,那么我希望记录一条错误消息,但仍然让工作线程按原样继续。

public class WorkerManager implements Runnable {

    private MyWorker[] workers;
    private int workerCount;
    private boolean stopRequested;

    public WorkerManager(int count){
        this.workerCount = count;
    }

    @Override
    public void run(){
        stopRequested = false;
        boolean managerStarted = false;

        while (!stopRequested) {
            if(!managerStarted) {
                workers = new MyWorker[workerCount];
                for (int i = 0; i < workerCount; i++) {
                    final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1));
                    workerThread.start();
                }
                managerStarted = true;
            }
        }
    }

    public void stop(){
        stopRequested = true;
    }

    //Calll this
    public void cleanUpOnExit() {
        for(MyWorker w: workers){
            w.setStopRequested();
        }
    }
}

工人阶层

   public class MyWorker implements Runnable {

    private final int WAIT_INTERVAL = 200;
    private MyService myService;
    private MyProvider myProvider;
    private boolean stopRequested = false;

    public MyWorker(MyService myService, MyProvider myProvider){
        this.myService = myService;
        this.myProvider = myProvider;
    }

    public void setStopRequested() {
        stopRequested = true;
    }

    @Override
    public void run() {

        while (!stopRequested) {
            boolean processedMessage = false;
            List<Message> messages = myProvider.getPendingMessages();
            if (messages.size() != 0) {
                AdapterLog.debug("We have " + messages.size() + " messages");
                processedMessage = true;
                for (Message message : messages) {
                    processMessage(messages);
                }
            }

            if (!(processedMessage || stopRequested)) {
                // this is to stop the thread from spinning when there are no messages
                try {
                    Thread.sleep(WAIT_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void processMessage(Message messages){
        myService.process(messages);
    }
}

共有1个答案

凌蕴藉
2023-03-14

您的WorkerManager需要一种方法来确定每个工作者的最后一条消息何时被处理。因此,工作人员将需要跟踪最后处理的消息的时间戳。

然后,您的WorkerManager可以检查每个worker的时间戳,并在需要时生成警告。为了检查使用给定期间的工人,可以使用ExecTutor:

    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(this::checkTimeoutProcessingMessages, 5l, 5l, TimeUnit.SECONDS);

您可以检查从每个工人那里获得时间戳的时间:

    public void checkTimeoutProcessingMessages() {
        for (MyWorker worker : workers) {
            long lastProcessed = worker.getLastProcessedMessageTimestamp();
            long currentTimestamp = System.currentTimeMillis();
            if (lastProcessed + 5000 > currentTimestamp) {
                //warn message
            }
        }
    }
 类似资料:
  • 我想建立一个单一的Spring Boot应用程序,同时做多种不同的任务。我在互联网上做了研究,但我找不到任何出路。我来详细说说。我希望每隔一段时间启动一次作业,例如一天一次。我可以用Spring石英来做。我也想在一个专用的互联网地址上听信息。消息将来自Apache Kafka平台。因此,我想将Kafka集成用于Spring框架。它实际上是否适用(始终监听消息并按时执行计划的作业)

  • 应用程序有一个JMS队列负责交付审计日志。应用程序将日志发送到JMS队列,该队列由MDB使用。 但是,发送的消息是大 XML 文件,大小从 20 MB 到 100 MB 不等。问题在于 JMS 队列使用消息的时间太长,从而导致内存不足错误。 我应该怎么做才能解决这个问题?

  • 问题内容: 我想在后台线程中运行一些Runnable。我想使用Handler,因为它便于延迟。我的意思是 凡 可运行 应当运行 后台 线程。是否可以创建这样的处理程序?是否在某个地方有“背景” Looper,或者该如何创建? PS我知道如何使用自定义类扩展Thread,但是比处理程序方式需要更多的编码工作。因此,请不要发布其他解决方案或类似的内容 如果Handler能以“干净”的方式做到这一点,我

  • 我想在后台线程中运行一些Runnable。我想使用Handler,因为它方便延迟。我的意思是 runnable应该在后台线程中运行。有可能创造这样的处理器吗?某个地方有没有“背景”Looper或者我怎么才能创建它? 附言:我知道如何使用自定义类扩展Thread来做到这一点,但它需要更多的编码工作,而不是以处理程序的方式进行。因此,请不要发布其他解决方案或类似内容 我只是想知道汉德勒是否能以“干净”

  • 请帮助我们采用Cadence:D 这是当前的设计。一些无状态工作人员从集中式队列中提取消息来处理它。工作人员中涉及复杂的业务逻辑以及Deduper功能,该功能利用单独的Redis集群作为远程分布式缓存(使用共识的强一致性)。该缓存仅存储消息ID及其状态,或者“正在进行”、“已完成”和“未启动”。显然,工作人员应该处理未完成的消息。 就我个人而言,我想重新考虑所有可能的解决方案。我想到了工作流模型,

  • 我有一些意想不到的行为,我不明白。我正在尝试实现一个固定的可变时间步长,如中所述http://gafferongames.com/game-physics/fix-your-timestep/ 和http://gameprogrammingpatterns.com/game-loop.html. 当我在VisualStudio中运行程序时,我的内部while循环从来不会计算为true;但是,当我取