我可以知道我的代码中有什么问题吗,因为重试机制没有按预期工作。在我的理解中,每次重试都应该调用process Record方法。
@EventListener(ApplicationReadyEvent.class)
public void processEvent() {
log.info("processEvent:: started..");
Scheduler scheduler = Schedulers.newBoundedElastic(threadPoolConfig.getThreadCap(),
threadPoolConfig.getQueuedTaskCap(), threadPoolConfig.getThreadPrefix(), threadPoolConfig.getTtlSeconds());
Flux<ReceiverRecord<String, String>> receiverRecordFlux = Flux.defer(requestReactiveKafkaConsumerTemplate::receive);
receiverRecordFlux.groupBy(m -> m.receiverOffset().topicPartition())
.doOnNext(partitionFlux -> log.info("processEvent:: topicPartition {}", partitionFlux.key()))
.flatMap(partitionFlux -> partitionFlux.subscribeOn(scheduler)
.doOnNext(r -> log.info("processEvent:: Record received from offset {} from topicPartition {} with message key {}", r.receiverOffset().topicPartition(), r.key(), r.offset()))
.flatMap(this::processRecord)
.doOnNext(receiverRecordInfo -> log.info("processEvent:: Record processed from offset {} from topicPartition {} with message key {}", receiverRecordInfo.receiverOffset().offset(), receiverRecordInfo.receiverOffset().topicPartition()))
.retryWhen(Retry.backoff(3, Duration.ofMillis(200))
.jitter(0d)
.doAfterRetry(retrySignal -> log.error("Retried {}", retrySignal.totalRetries()))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> new KafkaRetryExhaustException())
)
.doOnError(KafkaRetryExhaustException.class, (msg) -> {
log.error("exception... ", msg);
})
.onErrorResume(throwable -> Mono.empty())
)
.subscribe(
key -> log.info("Successfully consumed messages, key {}", key),
error -> log.error("Error while consuming messages ", error));
}
原木
2022-06-06 13:50:33,867 INFO [reactive-kafka-consumergroupId-1] reactor.util.Loggers$Slf4JLogger: processEvent:: topicPartition test-topic-0
2022-06-06 13:50:33,879 INFO [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: processEvent:: Record received from offset test-topic-0 from topicPartition 937 with message key 45
2022-06-06 13:50:33,883 INFO [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: processRecord:: processing started
2022-06-06 13:50:33,918 INFO [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: processRecord:: is httpRequest null? false
2022-06-06 13:50:34,180 ERROR [reactor-http-nio-3] reactor.util.Loggers$Slf4JLogger: processRecord:: exception..
org.springframework.web.reactive.function.client.WebClientResponseException$NotFound: 404 Not Found from POST http://localhost:8080/caps-app/capservices/fulfillment/accesspoints/v3/slots
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:202)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ 404 from POST http://localhost:8080/caps-app/capservices/fulfillment/accesspoints/v3/slots [DefaultWebClient]
Original Stack Trace:
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:202)
at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:207)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:703)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
2022-06-06 13:50:34,391 ERROR [parallel-2] reactor.util.Loggers$Slf4JLogger: Retried 0
2022-06-06 13:50:34,797 ERROR [parallel-3] reactor.util.Loggers$Slf4JLogger: Retried 1
2022-06-06 13:50:35,611 ERROR [parallel-4] reactor.util.Loggers$Slf4JLogger: Retried 2
2022-06-06 13:50:35,613 ERROR [CasprConsumer-1] reactor.util.Loggers$Slf4JLogger: exception...
com.walmart.caspr.exception.KafkaRetryExhaustException: null
at com.walmart.caspr.service.ReactiveConsumerService.lambda$processEvent$5(ReactiveConsumerService.java:71)
at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:376)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onError(FluxFlatMap.java:451)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onError(FluxSubscribeOn.java:157)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:721)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
下面的帖子已经解决了上述问题。重试工作热切而不是懒惰,因此出现了问题。
如何重试reactor-kafka中失败的用户记录
5.4 搞定反调试机制 现在的病毒是越来越狡猾了,无论是在感染,传播还是在反分析方面。一方面,将代码打包或者加密代码使代码模糊化,另一个方面使用反调试机制,郁闷调试者。接下来我们将 了解常用反调试机制,并用 Immunity 调试器和 Python 创造自己的脚本绕过反调试机制。 5.4.1 IsDebuggerPresent 现在最常用的反调试机制就是用 IsDebuggerPresent(由
Mac或者Linux版本(Win10的Linux子系统同样适用) 创建一个目录用来安装kafka以及zookeeper等相关软件,比如,新建一个名为streaming的文件夹 mkdir streaming Zookeeper安装 1.下载zookeeper 这里使用的版本是3.4.11,下载页面是Zookeeper下载,当然你也可以直接输入下面的命令 wget http://mirror.bi
在过去,android模拟器非常慢,我只使用设备来调试android应用程序。 React原生应用程序的首选开发环境、模拟器或设备是什么?
我正在探索反应性Kafka,只是想确认反应性Kafka是否等同于同步制作人。与同步生产者,我们得到消息传递保证与确认字符和生产者序列保持。但是,ASYNC不能保证交付和测序。反应式生产者等同于SYNC还是ASYNC?
上一节详细介绍了如何使用 java.lang.reflect 包提供的 Constructor 类获取构造方法的信息、使用 Method 类获取成员方法的信息以及使用 Field 类获取成员变量的信息。 本案例将介绍反射机制在网络编程中的应用,实现如何在客户端通过远程方法调用服务器端的方法。 假定在服务器端有一个 HelloService 接口,该接口具有 getTime() 和 echo() 方
spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。
我想知道用Kafka Streams执行这种操作的最佳方法是什么。 我有一个 Kafka 流和一个 KGlobal 表,让我们说产品 (1.000.000 消息) 和类别逻辑表 (10 msg)。每当新消息到达主题类别LogicBlobTable时,我需要重新处理所有将新到达的消息应用于产品的产品,并且输出将转到第三个主题。 我在考虑使用Kafka . tools . streams reset逻
我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如: