我们有一个注释,允许我们使用一个民意调查消费者来消费Kafka消息。它是为长时间运行的作业而设计的,因此一个线程正在处理消息,而另一个线程仍然可以用于轮询,以防止Kafka认为我们的服务失败,并重新平衡消费者。
我们正在使用Spring AOP。
课程:
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
}
我们一次只想处理一条消息,因此暂停
标志用于决定是现在处理消息,还是以后重新排队。
Runnable
用于处理消息(使用joinPoint.procedure()
),然后将消息回复给Kafka。这是通过执行器在单独的线程中完成的。
我发现,如果在
runnable
中抛出异常,它会在catch
语句中捕获,但当我们抛出新的PollableStreamListenerException
异常时不会传播,这意味着失败不会在DLQ中结束。
我认为这是因为执行发生在另一个线程上,而主线程在执行过程中已经移动(不等待第二个线程处理消息),因此不再能够传播到DLQ。然而,我可能误解了这一点,因为我不太熟悉多线程的工作原理。
我已经尝试将
executor.submit(runnable)
修改为executor.submit(runnable). get()
,这解决了这个问题。但是它会导致阻塞主线程的执行,直到另一个线程完成执行,这意味着主线程不再可用于轮询新消息。实际上,这使我们的Kafka消费者不再是可污染的消费者,这违背了拥有注释的整个目的。
有人知道是否可以实现让主线程继续运行和轮询消息,同时在传播到DLQ的
runnable
中抛出异常吗?
提前感谢你的帮助。
为了提供更多的上下文,我们使用如下注释:
@PollableStreamListener
public void submitDeletion(Message<?> received) {
// Process message
}
每当在Pollable消息源上收到新消息时,都会调用
submitDeletion
方法。我们使用@附表
检查新消息:
@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
public void pollForDeletionRequest() {
log.trace("Polling for new messages");
cleanupInput.poll(cleanupSubmissionService::submitDeletion);
}
更新:尝试使用CompletableFuture
根据@kriegaex的评论,我试图使用一个
CompletableFuture
。我简化了我的示例,使其更像一个POC。
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("Start execution logging");
try {
Thread.sleep(10000);
} catch (Exception e) {
log.error("Error while sleeping", e);
}
log.info("End execution logging");
throw new RuntimeException("Throwing exception to force handle statement");
}, executor).handle((s, t) -> {
log.info("Inside handle block:");
if (t != null) {
log.info(t.toString());
throw new RuntimeException(t);
}
return null;
});
try {
completableFuture.join();
} catch (Exception e) {
log.error("Error while doing join()", e);
}
callback.acknowledge(Status.ACCEPT);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
}
我使用我的
ExecutorService
实例在CompletableFuture
中运行函数。它抛出一个异常,该异常在中处理。handle()
block。
我的第一次测试没有以下代码行,我发现该函数在另一个线程上运行而没有阻止主线程进行轮询,但
hand()
内部抛出的异常没有传播到DLQ。缺少代码行:
try {
completableFuture.join();
} catch (Exception e) {
log.error("Error while doing join()", e);
}
然后我在中添加了这些代码行,发现它在等待
completableFuture
完成运行时开始阻止主线程的执行。
简而言之,使用
CompletableFuture
的行为似乎与我在初始Runnable
中发现的行为相同。
最后,我在Runnable
中处理了这个错误,方法是手动将错误发布到DLQ,而不是依赖Spring Cloud Stream来为我处理这个问题。虽然这显然不是一个理想的解决方案,但最终的结果是有效的。
我有下面的课程。两个注释(AnnotA和AnnotB),一个类的子类。java'(带@AnnotA)及其“父”基。java'(带有@AnnotB)。 编译子对象时。java,我的注释处理器报告AnnotA,但它不报告在Base中找到的注释(AnnotB)。JAVA 安诺塔。JAVA 阿诺特。JAVA 基础JAVA 小孩JAVA MyProc。JAVA 这是编译过程及其输出,正如您所看到的,没有关于
问题内容: 我正在运行以下简单代码: 但是当我运行它时,它会打印 实际上python线程会忽略我的+键盘中断而无法打印。为什么?此代码有什么问题? 问题答案: 尝试 没有对的调用,主要过程是过早地跳出该块,因此不会被捕获。我的第一个想法是使用,但这似乎阻塞了主进程(忽略KeyboardInterrupt),直到完成。 导致线程在主进程结束时终止。
我目前正在使用Android Room在Android上存储一些小数据。理论上,我使用allowMainThreadQueries()应该没有问题,因为我的所有数据都很小,但为了将来证明我的程序,我尝试将所有调用移动到一个单独的线程中。到目前为止,我在AppConfig中有一个静态处理程序,该类在应用程序初始启动时初始化,在应用程序范围内可见: DbHandler是一个扩展Handler的自定义类
问题内容: 我更喜欢将异常处理逻辑放在main方法附近的调用堆栈中。我喜欢这种方法…但是,我创建了一个线程,该线程的run()内部的某些方法调用可能会引发异常。我真的很想看看是否有一种方法可以将这些异常返回到父线程?我能想到的最好的办法是在实现的对象内部设置一个变量。该变量是一个包含错误消息的字符串,该错误消息随后使用类加载器在父线程中正确地重新创建相同的异常。 我想知道的是,在这里得到想要的东西
我试图使用Jackson注释来重新命名序列化过程中产生的一些json标签。所有注释都编译得很好,当我运行时,除了所有Jackson注释之外,Jackson序列化工作完全被忽略。即使像@jsonignore或@jsonproperty这样的基本命令对json响应也没有影响。构建路径中的库有: 下面是我需要序列化的一个类的代码示例:
有什么想法为什么@primary在这里没有被考虑在内吗?