当前位置: 首页 > 知识库问答 >
问题:

在spring kafka中发布到主题和手动提交

霍建柏
2023-03-14
@Bean
public ProducerFactory producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
    defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
    //defaultKafkaProducerFactory.transactionCapable();
    return defaultKafkaProducerFactory;
    //return new DefaultKafkaProducerFactory<>(config);
}


@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}


@Bean
public KafkaTransactionManager<String, User> transactionManager() {
    KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
    transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
    transactionManager.setNestedTransactionAllowed(true);
    return transactionManager;
}


/**
 * New configuration for the consumerFactory added
 *
 * @return
 */
@Bean
public ConsumerFactory<String, User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}


@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setTransactionManager(transactionManager());
    factory.setRetryTemplate(kafkaRetry());
    factory.setStatefulRetry(true);
    factory.setErrorHandler(getErrorHandler());
    factory.setRecoveryCallback(retryContext -> {
        //implement the logic to decide the action after all retries are over.
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        System.out.println("Recovery is called for message  " + consumerRecord.value());
        return Optional.empty();
    });

    return factory;
}



public RetryTemplate kafkaRetry() {
    RetryTemplate retryTemplate = new RetryTemplate();

    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(60 * 1000);
    backOffPolicy.setMultiplier(3);
    backOffPolicy.setMaxInterval(4 * 60 * 1000);       // original 25 * 60 * 1000
    retryTemplate.setBackOffPolicy(backOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(4);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}


public SeekToCurrentErrorHandler getErrorHandler() {
    SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {

        @Override
        public void handle(Exception thrownException,
                           List<ConsumerRecord<?, ?>> records,
                           Consumer<?, ?> consumer,
                           MessageListenerContainer container) {
            //super.handle(thrownException, records, consumer, container);
            if (!records.isEmpty()) {
                ConsumerRecord<?, ?> record = records.get(0);
                String topic = record.topic();
                long offset = record.offset();
                int partition = record.partition();

                if (thrownException instanceof DeserializationException) {
                    System.out.println("------1111------deserialization exception ");
                } else {
                    System.out.println("------xxxx------Record is empty ");
                    consumer.seek(new TopicPartition(topic, partition), offset);
                }
            } else {
                System.out.println("------4444------Record is empty ");
            }

        }
    };

    return errorHandler;
}
   @Autowired
KafkaTemplate<String, User> kafkaTemplate;


@KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
@Transactional
public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {

    /*System.out.println("get the message " +user.getFirstName());
    if (user.getFirstName().equalsIgnoreCase("Test")) {
        throw new RuntimeException("Incompatible message " + user.getFirstName());
    }
    */

    //postToSecondTopic(acknowledgment, user);

    System.out.println("NOT In transaction");
    kafkaTemplate.executeInTransaction(t -> {
        System.out.println("---------------------->");
        int number = (int) (Math.random() * 10);
        t.send("secondtopic", user);
        if (number % 5 == 0)
            throw new RuntimeException("fail");
        acknowledgment.acknowledge();
        return true;
    });


    System.out.println("*** exit ***");
}

日志中的错误

2020-05-28 15:52:53.597错误112469---[nio-8080-exec-1]O.a.C.C.C.[.[.[/].[dispatcherServlet]:servlet.Service()对于servlet[dispatcherServlet]在路径[]上下文中引发异常[请求处理失败;嵌套异常是java.lang.IllegalStateException:没有事务在处理中;可能的解决方案:在template.executeinTransaction()操作范围内运行模板操作,在调用模板方法之前使用@transactional启动事务,在使用记录时在侦听器容器启动的事务中运行

IllegalStateException:没有事务在处理中;可能的解决方案:在template.executeintransaction()操作范围内运行模板操作,在调用模板方法之前用@transactional启动事务,在使用org.springframework.util.assert.state(assert.java:73)~[spring-core-5.2.5.release.jar:5.2.5.release]在org.springframework.kafka.core.kafkatemplate.doSend(kafkatemplate.java:394)~[spring-kafka-2.3.7.release.jar:2.3.7.plate.java:216)~[spring-kafka-2.3.7.release.jar:2.3.7.release]在com.barade.sandesh.springkafka.userresource.postcomments(userresource.java:26)~[classes/:na]在sun.reflect.nativeMethodAccessorimpl.invoke0(原生方法)~[na:1.8.0_252]在在java.lang.reflect.method.invoke(method.java:498)~[NA:1.8.0_252]在org.springframework.web.method.support.invocablehandlermethod.doinvoke(invocablehandlermethod.java:190)~[spring-web-5.2.5.release.jar:5.2.5.release]在AbleHandlerMethod.invokeandHandland(servletInvocableHandlerMethod.java:105)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]在org.springframework.web.servlet.mvc.method.annotation.requestMappingHandlerAdapter.invale(requestMappingHandlerAdapter.invale(requestMappingHandlerAdapter.java:879)~[spring-webmapping-5.2.5.release]在793)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]位于org.springframework.web.servlet.mvc.methOd.abstractHandlerMethodAdapter.handle(abstractHandlerMethodAdapter.java:87)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]在org.springframework.web.servlet.dispatcherServlet.doDispatcherServlet.java:1040)~[spring-webmvc-5.2.5.release]在org.springframework.web.servlet.dispatcherServlet:5.2.5.release]在(frameworkservlet.java:1006)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]在org.springframework.web.servlet.frameworkservlet.doPost(frameworkservlet.java:909)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]

用户资源

@RestController
@RequestMapping("accounts")
public class UserResource {

    @Autowired
    KafkaTemplate <String, User> kafkaTemplate;


    @PostMapping("/users")
    public String postComments(@RequestParam ("firstName") final String firstName,
                                    @RequestParam ("lastName") final String lastName,
                                    @RequestParam ("userName") final String userName )  {

        List<String> accountTypes = new ArrayList<String>();
        kafkaTemplate.send("firstTopic", new User(firstName,lastName,userName));

        return "Message sent to the Error queue";
    }



}

共有1个答案

太叔天宇
2023-03-14

是;参见交易记录。

容器启动事务,监听器中的任何kafkatemplate发送操作都参与事务;容器将偏移量发送到事务并提交(如果监听器正常退出)。

不需要“手动”提交偏移量。

 类似资料:
  • 我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执

  • 我希望有一个简单的命令,就像我在bash中使用的一样,在AWS Lambda函数中发布一些内容到MQTT上的主题。按照:mosquitto_pub-h my.server.com-t“light/set”-m“on” 背景:我想和Alexa一起打开和关闭一盏灯。Alexa可以启动一个Lambda函数,在这个Lambda函数中,我想启动一个MQTT发布,因为lamp可以侦听MQTT主题并对那里的消息

  • 背景资料 AWS服务是区域性的(例如,),boto3库要求您在访问客户端或资源之前设置默认区域。但是,这里的留档显示您可以使用通配符替换区域的SNS主题ARN。留档说: 文档:Amazon简单通知服务(Amazon SNS) 语法: 示例: 密码 当我使用boto3的SNS资源/客户端发布到主题ARN(该区域具有通配符)时,我得到以下错误。当我没有该区域的通配符时(例如,我指定了),一切正常。我查

  • 我创建了一个开源项目,希望将其发布到maven central,以便用户只需在POM中引用它就可以使用该库。例如: 我找到了几个在线教程,但其中一些已经过时,一些建议自动化整个过程,从而公开复杂化它。 例如,一篇教程建议为github帐户创建SSH键,并让maven在推送到maven Central时自动创建一个git标记。虽然这是有用的,但没有必要开始。 另一个例子,试图直接通过maven发布它

  • 阅读:Kafka Connect FileStreamSource忽略附加行 看来Kafka现在支持这一观点,他说: https://docs.confluent.io/5.5.0/connect/management/configuring.html#Standalone-示例 是否声明该文件被监视: 开始独立连接 将文件的所有内容添加到主题中,将新行添加到中不会将这些行添加到主题中。是否需要配

  • 在本章中,让我们研究一下Themes and Layouts 。 Drupal将在安装过程中将Bartik主题安装为默认主题。 您可以从Drupal官方网站选择付费或免费主题。 通常,布局是文本和图形的排列。 选择主题是一个好主意,请记住在您的网站上使用不同的布局。 Step 1 - 访问Drupal官方网站 ,然后单击Get Started 。 Step 2 - 单击All Themes ,如以