当前位置: 首页 > 知识库问答 >
问题:

spring kafka-无法生成消息,事务回滚不起作用-org。阿帕奇。Kafka。常见的错误。ProducerFencedException产品

米迪
2023-03-14

spring版本:2.1.4。发布spring kafka:2.2.5 apache kafka:1.1.0 zoopkeepr:3.4.9

framewrok:Spring防尘套

我正在为我的应用程序使用ChainedKafkaTransactionManager。我有时会遇到ProducerFencedException(10次中有5次),而且数据库中也没有发生回滚(MySQL、spring JPA)。我尝试更改producer属性,还使用了链式事务管理器,但仍然没有成功。

我正在使用@Transactional注释

在一个事务中,我正在执行几个jpa操作(保存/更新),并使用KafkaTemplate向Kafka发送几个(1到8)消息。如果在发送消息期间发生错误,事务将不会回滚。

这是我的配置和日志:

@Configuration
@EnableKafka
public class KafkaSenderConfig {

    @Value("${kafka.servers}")
    private String kafkaServers;

    @Bean(name = "chainedStringKafkaTransactionManager")
    @Primary
    public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(
            JpaTransactionManager jpaTransactionManager, DataSourceTransactionManager dsTransactionManager) {
        return new ChainedKafkaTransactionManager<String, String>(kafkaStringTransactionManager(),
                jpaTransactionManager, dsTransactionManager);
    }

    @Bean(value = "stringKafkaTransactionManager")
    public KafkaTransactionManager<String, String> kafkaStringTransactionManager() {
        KafkaTransactionManager<String, String> ktm = new KafkaTransactionManager<String, String>(
                stringProducerFactory());
        ktm.setNestedTransactionAllowed(true);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
        return ktm;
    }

    @Bean
    public DataSourceTransactionManager dsTransactionManager(@Qualifier("datasource") DataSource ds) {
        return new DataSourceTransactionManager(ds);
    }

    @Bean
    public JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory) {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory);
        return transactionManager;
    }

    @Bean
    public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
        return jpaTransactionManager(entityManagerFactory);
    }

    @Bean(value = "stringProducerFactory")
    @Primary
    public ProducerFactory<String, String> stringProducerFactory() {
        Map<String, Object> config = new ConcurrentHashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(
                config);
        defaultKafkaProducerFactory.setTransactionIdPrefix("demo-transaction-");
        return defaultKafkaProducerFactory;
    }

    @Bean(value = "stringKafkaTemplate")
    @Primary
    public KafkaTemplate<String, String> stringKafkaTemplate() {
        return new KafkaTemplate<>(stringProducerFactory(), true);
    }

}

2019-05-07 01:05:56.236 ERROR [demo-account-service,,,] 3805 --- [ad | producer-3] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"source":"AGENT","objectId":48,"action":"INSERT","userId":33,"messageType":"DEBUG","message":"Creat...' to topic audit:

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

2019-05-07 01:05:56.236 DEBUG [demo-account-service,8ab0419e02ba71ae,8ab0419e02ba71ae,false] 3805 --- [nio-8080-exec-6] c.k.s.c.service.impl.CommonServiceImpl   : Successfully sent Message to Process Audit Topic
2019-05-07 01:05:56.239 ERROR [demo-account-service,8ab0419e02ba71ae,8ab0419e02ba71ae,false] 3805 --- [nio-8080-exec-6] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@28a3bc81, txId=demo-transaction-2]

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:221)
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:659)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:497)
        at brave.kafka.clients.TracingProducer.commitTransaction(TracingProducer.java:54)
        at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:49)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:174)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:746)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
        at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
        at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:533)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:304)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
        at com.demo.service.common.service.impl.CommonServiceImpl$$EnhancerBySpringCGLIB$$472a8a0b.sendAudit(<generated>)
        at com.demo.service.common.service.impl.UserServiceImpl.createAudit(UserServiceImpl.java:1364)
        at com.demo.service.common.service.impl.UserServiceImpl.create(UserServiceImpl.java:447)
        at com.demo.service.common.service.impl.UserServiceImpl$$FastClassBySpringCGLIB$$4debe8ca.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
        at com.demo.service.common.service.impl.UserServiceImpl$$EnhancerBySpringCGLIB$$104cac6b.create(<generated>)
        at com.demo.business.account.controller.AgentController.add(AgentController.java:158)
        at com.demo.business.account.controller.AgentController$$FastClassBySpringCGLIB$$c9940e69.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:69)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
        at com.demo.business.account.controller.AgentController$$EnhancerBySpringCGLIB$$19ef74cb.add(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
        at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:494)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
        at com.demo.business.account.controller.AgentController$$EnhancerBySpringCGLIB$$3b8236ae.add(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
        at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:660)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java:90)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:119)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.oauth2.provider.authentication.OAuth2AuthenticationProcessingFilter.doFilter(OAuth2AuthenticationProcessingFilter.java:176)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:74)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
        at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
        at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
        at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
        at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.cloud.sleuth.instrument.web.ExceptionLoggingFilter.doFilter(ExceptionLoggingFilter.java:48)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at brave.servlet.TracingFilter.doFilter(TracingFilter.java:86)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117)
        at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415)
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

'

共有1个答案

卫博雅
2023-03-14

不要在多个地方问同一个问题。

这是浪费你和我们的时间。此外,我想你没有阅读问题模板,其中明确指出问题是针对错误和新功能请求的;不是为了提问。

这个例外来自Kafka,与Spring无关。

阅读java文档以了解ProducerFencedException。

/**
 * This fatal exception indicates that another producer with the same <code>transactional.id</code> has been
 * started. It is only possible to have one producer instance with a <code>transactional.id</code> at any
 * given time, and the latest one to be started "fences" the previous instances so that they can no longer
 * make transactional requests. When you encounter this exception, you must close the producer instance.
 */
public class ProducerFencedException extends ApiException {

    public ProducerFencedException(String msg) {
        super(msg);
    }
}
 类似资料:
  • 我有两个代理1.0.0Kafka集群,我正在针对这个Kafka运行1.0.0Kafka流API应用程序。我增加了制片人的要求。暂停。毫秒到5分钟来修复生产者超时异常。 目前,在运行一段时间后,我发现以下两种类型的异常。我试图按照ApacheKafka中的建议修复这些异常:TimeoutException,然后什么都不起作用‏ 但不完整的解决方案就在这里。建议使用此解决方案(减少生产批量)。请帮忙。

  • 我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认

  • 我试图从JMS源读取数据,并将它们推送到KAFKA主题中,几个小时后,我观察到推送到KAFKA主题的频率几乎为零,经过一些初步分析,我在FLUME日志中发现以下异常。 my flume显示max.request的当前设置值(在日志中)。尺寸为1048576,明显小于1399305,增加了此最大要求。大小可能会消除这些异常,但我无法找到更新该值的正确位置。 我的水槽。配置, 任何帮助都将不胜感激!!

  • 我已经配置了一个路由来从交易所中提取一些数据并聚合它们;这是简单的总结: 问题是聚合完成永远不起作用,例如,这是我的测试示例: ReflelctionTestUtils.setField;ReflectionTestUtils.setFiled;producerTemplate.send(FingerprintHistoryRouteBuilder.FINGERPRINT_HISTORY_ENDP

  • 我正在尝试使用Spring Cloud Stream框架构建一个简单的Kafka Streams应用程序。我可以连接到流以推送原始数据进行处理。但是当我尝试按键处理流进行事件计数时,我得到了未找到的运行应用程序时异常。我检查了我的项目包含的库,我可以找到类,它没有丢失。我不确定为什么在运行时它没有被加载! 下面是我的源文件。 <代码>com。pgp。学Kafka。分析。分析应用程序 <代码>com

  • 我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦