当工作线程处理消息时,如果处理时间超过5分钟,我希望生成一条警告消息,但仍然让工作线程继续处理。
问题
我希望不断检查工作线程是否超过了5分钟的消息处理时间,如果超过了阈值时间,那么我希望记录一条错误消息,但仍然让工作线程按原样继续。
public class WorkerManager implements Runnable {
private MyWorker[] workers;
private int workerCount;
private boolean stopRequested;
public WorkerManager(int count){
this.workerCount = count;
}
@Override
public void run(){
stopRequested = false;
boolean managerStarted = false;
while (!stopRequested) {
if(!managerStarted) {
workers = new MyWorker[workerCount];
for (int i = 0; i < workerCount; i++) {
final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1));
workerThread.start();
}
managerStarted = true;
}
}
}
public void stop(){
stopRequested = true;
}
//Calll this
public void cleanUpOnExit() {
for(MyWorker w: workers){
w.setStopRequested();
}
}
}
工人阶层
public class MyWorker implements Runnable {
private final int WAIT_INTERVAL = 200;
private MyService myService;
private MyProvider myProvider;
private boolean stopRequested = false;
public MyWorker(MyService myService, MyProvider myProvider){
this.myService = myService;
this.myProvider = myProvider;
}
public void setStopRequested() {
stopRequested = true;
}
@Override
public void run() {
while (!stopRequested) {
boolean processedMessage = false;
List<Message> messages = myProvider.getPendingMessages();
if (messages.size() != 0) {
AdapterLog.debug("We have " + messages.size() + " messages");
processedMessage = true;
for (Message message : messages) {
processMessage(messages);
}
}
if (!(processedMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private void processMessage(Message messages){
myService.process(messages);
}
}
您的WorkerManager
需要一种方法来确定每个工作者的最后一条消息何时被处理。因此,工作人员将需要跟踪最后处理的消息的时间戳。
然后,您的WorkerManager
可以检查每个worker的时间戳,并在需要时生成警告。为了检查使用给定期间的工人,可以使用ExecTutor:
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(this::checkTimeoutProcessingMessages, 5l, 5l, TimeUnit.SECONDS);
您可以检查从每个工人那里获得时间戳的时间:
public void checkTimeoutProcessingMessages() {
for (MyWorker worker : workers) {
long lastProcessed = worker.getLastProcessedMessageTimestamp();
long currentTimestamp = System.currentTimeMillis();
if (lastProcessed + 5000 > currentTimestamp) {
//warn message
}
}
}
我想建立一个单一的Spring Boot应用程序,同时做多种不同的任务。我在互联网上做了研究,但我找不到任何出路。我来详细说说。我希望每隔一段时间启动一次作业,例如一天一次。我可以用Spring石英来做。我也想在一个专用的互联网地址上听信息。消息将来自Apache Kafka平台。因此,我想将Kafka集成用于Spring框架。它实际上是否适用(始终监听消息并按时执行计划的作业)
应用程序有一个JMS队列负责交付审计日志。应用程序将日志发送到JMS队列,该队列由MDB使用。 但是,发送的消息是大 XML 文件,大小从 20 MB 到 100 MB 不等。问题在于 JMS 队列使用消息的时间太长,从而导致内存不足错误。 我应该怎么做才能解决这个问题?
问题内容: 我想在后台线程中运行一些Runnable。我想使用Handler,因为它便于延迟。我的意思是 凡 可运行 应当运行 后台 线程。是否可以创建这样的处理程序?是否在某个地方有“背景” Looper,或者该如何创建? PS我知道如何使用自定义类扩展Thread,但是比处理程序方式需要更多的编码工作。因此,请不要发布其他解决方案或类似的内容 如果Handler能以“干净”的方式做到这一点,我
我想在后台线程中运行一些Runnable。我想使用Handler,因为它方便延迟。我的意思是 runnable应该在后台线程中运行。有可能创造这样的处理器吗?某个地方有没有“背景”Looper或者我怎么才能创建它? 附言:我知道如何使用自定义类扩展Thread来做到这一点,但它需要更多的编码工作,而不是以处理程序的方式进行。因此,请不要发布其他解决方案或类似内容 我只是想知道汉德勒是否能以“干净”
请帮助我们采用Cadence:D 这是当前的设计。一些无状态工作人员从集中式队列中提取消息来处理它。工作人员中涉及复杂的业务逻辑以及Deduper功能,该功能利用单独的Redis集群作为远程分布式缓存(使用共识的强一致性)。该缓存仅存储消息ID及其状态,或者“正在进行”、“已完成”和“未启动”。显然,工作人员应该处理未完成的消息。 就我个人而言,我想重新考虑所有可能的解决方案。我想到了工作流模型,
我有一些意想不到的行为,我不明白。我正在尝试实现一个固定的可变时间步长,如中所述http://gafferongames.com/game-physics/fix-your-timestep/ 和http://gameprogrammingpatterns.com/game-loop.html. 当我在VisualStudio中运行程序时,我的内部while循环从来不会计算为true;但是,当我取