当前位置: 首页 > 知识库问答 >
问题:

通过spring cloud stream活页夹kafka或spring kafka,可以只交付一次,使用哪一种

池兴邦
2023-03-14

我试图在Spring Boot应用程序中使用spring-cloud d-stream-binder-kafka实现一次交付。我使用的版本是:

  • spring-cloud-stream-binder-kafka-core-1.2.1。发布
  • spring-cloud-stream-binder-kafka-1.2.1。发布
  • spring-cloud-stream-codec-1.2.2。释放spring-kafka-1.1.6。释放
  • Spring-集成-kafka-2.1.0。发布
  • sping-集成-core-4.3.10。发布
  • 动物园管理员-3.4.8
  • Kafka版本:0.10.1.1

这是我的配置(云配置):

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events

我有两个主要类:FeedSink接口:

package au.com.xyz.proxy.interfaces;
import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.messaging.MessageChannel;

public interface FeedSink {

String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";

@Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
MessageChannel feedlatformEventsInput();
} 

事件消费者

package au.com.xyz.proxy.consumer;

@Slf4j
@EnableBinding(FeedSink.class)
public class EventConsumer {

    public static final String SUCCESS_MESSAGE =
            "SEND-SUCCESS : Successfully sent message to platform.";
    public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
    public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
    public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
            "EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";

    @Autowired
    private CapPointService service;

    @StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
    /**
     * method associated with stream to process message.
     */
    public void message(final @Payload EventNotification eventNotification,
                        final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

        String caseMilestone = "UNKNOWN";
        if (!ObjectUtils.isEmpty(eventNotification)) {
            SysMessage sysMessage = processPayload(eventNotification);
            caseMilestone = sysMessage.getCaseMilestone();
            try {
                ClientResponse response = service.sendPayload(sysMessage);
                if (response.hasFault()) {
                    Fault faultDetails = response.getFaultDetails();
                    log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
                } else {
                    log.info(SUCCESS_MESSAGE);
                }
                acknowledgment.acknowledge();
            } catch (Exception e) {
                log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
            }
        } else {
            log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
            acknowledgment.acknowledge();
        }
    }



    private SysMessage processPayload(final EventNotification eventNotification) {
        Gson gson = new Gson();
        String jsonString =  gson.toJson(eventNotification.getData());
        log.info("Consumed message for platform events with payload : {} ", jsonString);
        SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
        return sysMessage;
    }
    }

我已将Kafka和spring容器的autocommit属性设置为false。如果您看到在EventConsumer类中,我在我提供服务的情况下使用了Acknowledge。sendPayload成功,没有例外。我希望容器移动偏移量并轮询下一个记录。我观察到的是:

>

场景2——如果发生异常,然后将后续消息推送到Kafka。我看到新消息被处理,偏移量被移动。这意味着我丢失了未被确认的信息。所以问题是我是否处理过这个问题。我如何控制从上次提交中读取信息,而不仅仅是最新的信息并对其进行处理。我假设在内部进行了一次民意调查,但没有考虑到或不知道最后一条信息没有得到确认。我不认为有多条线索在读Kafka。我不知道@Input和@StreamListener注释是如何控制的。我假设线程由属性使用者控制。控制线程的并发性,默认设置为1。

所以我做了研究,发现了很多链接,但不幸的是,它们都没有回答我的具体问题。我看了(https://github.com/spring-cloud/spring-cloud-stream/issues/575),里面有马吕斯的评论(https://stackoverflow.com/users/809122/marius-bogoevici):

请注意,Kafka不提供单独的消息确认,这意味着确认转换为将最新消耗的偏移量更新为已确认消息的偏移量(每个主题/分区)。这意味着,如果您无序地确认来自同一主题分区的消息,那么一条消息可以“确认”它之前的所有消息。

当有一个线程时,不确定是否是订单问题。

很抱歉发了这么长的帖子,但我想提供足够的信息。最重要的是,我正在努力避免在使用Kafka时丢失消息,我正在尝试看看spring cloud stream binder kafka是否可以完成这项工作,或者我必须寻找替代方案。

2018年7月6日更新

我看到了这篇帖子https://github.com/spring-projects/spring-kafka/issues/431这是解决我问题的更好方法吗?我可以试试最新版本的《SpringKafka》

@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
                containerGroup = "quxGroup")
public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {
  • 这是否有助于控制将偏移量设置为上次成功处理记录的位置?我怎样才能从listen方法中做到这一点。消费者seekToEnd();然后如何重置监听方法以获取该记录
  • 将消费者放在签名中是否能为与消费者打交道提供支持?或者我还需要做什么
  • 我应该使用Acknowledge还是consumer。commitSyncy()
  • 集装箱工厂的意义是什么。我必须把它定义为豆子吗
  • 我需要@EnableKafka和@Configuration才能让上述方法发挥作用吗?请记住,该应用程序是一个Spring Boot应用程序
  • 通过添加Consumer to listen方法,我不需要实现ConsumerAware接口

最后但并非最不重要的一点是,如果上述方法可行,是否可以提供一些例子。

2018年7月12日更新

谢谢加里(https://stackoverflow.com/users/1240763/gary-russell)提供使用maxAttempts的提示。我用过这种方法。我能够做到一次送达,并保持信息的秩序。

我更新的云配置:

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events
              consumer:
                maxAttempts: 2147483647
                backOffInitialInterval: 1000
                backOffMaxInterval: 300000
                backOffMultiplier: 2.0

事件使用者与我最初的实现保持一致。除了重新引发错误以使容器知道处理已失败之外。如果您只是捕获了它,那么容器就不可能知道消息处理失败了。通过承认。确认您只是在控制偏移量提交。为了重试,必须抛出异常。不要忘记将kafka客户机autocommit属性和spring(容器级别)autocommitOffset属性设置为false。就这样。

共有1个答案

邴宏大
2023-03-14

正如马吕斯所解释的,Kafka在日志中只保留一个偏移量。如果处理下一条消息,并更新偏移量;失败的消息丢失。

您可以将失败的消息发送到死信主题(将enableDlq设置为true)。

Spring Kafka(2.1.x)的最新版本具有特殊的错误处理程序Containers ToppingErrorHandler,当发生异常时,它会停止容器,而seekToCurInterrorHandler会导致失败的消息重新传递。

 类似资料:
  • 我试图用事务性生产者/消费者来准确地理解Kafka。 我遇到了下面的例子。但是,我还是很难准确地理解一次。这个代码正确吗? 制作人sendOffsetsToTransaction-此代码的作用是什么?这是否应该针对同一个目标主题? 什么是消费者之前的系统崩溃。commitSync();//将再次读取相同的消息并生成重复消息?

  • 我的目标是从主题A消费,做一些处理和生产到主题B,作为单个原子动作。要做到这一点,我有两种选择: null 我已成功验证选项#1。所谓成功,是指如果我的处理失败(抛出IllegalArgumentException),来自主题A的已消费消息将继续被KafKalistener消费。这是我所期望的,因为没有提交偏移量,而使用了DefaultAfterRollbackProcessor。 我希望看到相同

  • 使用主题交换,我希望有一个具有以下特性的发布/订阅消息传递模式: 是否实现了“发布者确认”。 让使用者在处理完每条消息后也确认该消息。 使用路由密钥将邮件路由到一个或多个使用者。 具有持久的使用者队列,因此如果使用者应用程序暂时关闭,它可以在重新启动时从其队列中拾取消息。 所以我创建了2个控制台应用程序(发送和接收)来测试上面的内容。 } 接收 问题是我的Send程序中的OnBasicAcks只会

  • 关于合流博客 只有一次语义是可能的:Kafka就是这样做的 精确一次语义学:即使生产者重试发送消息,它也会导致消息仅一次传递给最终消费者。精确一次语义学是最理想的保证,但也是一个很少被理解的保证。这是因为它需要消息传递系统本身与生成和消费消息的应用程序之间的合作。例如,如果在成功消费消息后,您将Kafka消费者倒带到上一个偏移量,您将再次收到从该偏移量到最新偏移量的所有消息。这说明了为什么消息传递

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • 问题内容: 我有一个要添加“立即付款”按钮的产品列表,这样我就可以允许我的客户通过Paypal付款。 我已经阅读了文档,找不到如何执行此操作。我可以添加多个项目,但这不会很方便,因为我已经有要处理的项目列表。我还需要结帐流程来逐项列出订单,因此以1个价格“立即购买”也不是一件好事。 任何帮助表示赞赏的人,我都尝试过(没有运气): 问题答案: 请参阅此示例,并相应地进行更改。基本上将下划线添加到项目