当前位置: 首页 > 知识库问答 >
问题:

使用Spring Cloud Stream和Kafka进行重复消息处理

柳鸿博
2023-03-14

我正在使用Kafka活页夹的Spring Cloud Stream。它工作得很好,但客户端接收到重复的消息。已经尝试了所有Kafka消费属性,但没有结果。

在我的应用程序示例中检查2个类-Aggregate Application和EventFilterApplication。如果我运行EventFilterApplication-只有1条消息,如果是Aggregate Application-2条相同的消息。

下面是我的代码:

1) 聚合器

import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;

@SpringBootApplication
public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

2) 事件过滤器应用程序

@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {

    @Autowired
    LiveProcessor source;

    @StreamListener(LiveProcessor.INPUT)
    public void handle(byte[] event) {
        try {

            System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));

        } catch (Exception e) {
            System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
    }

    interface LiveProcessor extends Source {

        String INPUT = "liveSource";

        @Input(INPUT)
        SubscribableChannel input();
    }
}

3) 应用程序。yml公司

spring:
cloud:
    stream:
        kafka:
          binder:
              brokers: kafka-broker.example.com:9092
              defaultBrokerPort: 9092
              defaultZkPort: 2181
              zkNodes: kafka-zookeeper.example.com
        type: kafka
        bindings:
            liveSource:
                binder: kafka
                consumer:
                    headerMode: raw
                    autoCommitOffset: true
                destination: topic_example_name

build.gradle

buildscript {
    ext { springBootVersion = '1.4.2.RELEASE' }
    repositories {
        jcenter()
        maven { url 'http://repo.spring.io/plugins-release' }
    }
    dependencies {
        classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
        classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
        classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
    }
}

ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'

subprojects {
    apply plugin: 'java'
    apply plugin: 'propdeps'
    apply plugin: 'propdeps-idea'
    apply plugin: "io.spring.dependency-management"

    sourceCompatibility = 1.8

    dependencyManagement {
        imports {
            mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
            mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
            mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
        }
    }

    dependencies {
        compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
            exclude module: "spring-boot-starter-tomcat"
            exclude group: 'log4j'
        }

        compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")

        compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
            exclude group: "org.slf4j"
        }

        compile("org.springframework.cloud:spring-cloud-stream:")

        compile("org.springframework.cloud:spring-cloud-starter-sleuth")

        compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")

        testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
            exclude group: "org.slf4j"
        }
    }
}

共有1个答案

牛骞仕
2023-03-14

复制是由EventFilterApplication作为父根引起的:

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

这很可能会创建两个订阅。您可以简单地执行以下操作,而不是将EventFilterApplication添加为root用户:

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(args)
            .from(EventFilterApplication.class)
            // rest of the pipeline
            .run(args);
    }
}

如果不需要创建聚合,这应该足够了:

public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
}

编辑:添加了一个额外的例子和澄清的答案。

 类似资料:
  • 我对JSF2.2、Omnifaces 2.6.3和PrimeFaces 6.1的客户端验证有一个问题,这导致来自Bean验证的验证消息显示两次。 如果在我使用的某个按钮中 验证有效,但每次验证都会产生两条消息: 我将问题追溯到类javax.faces.validator.BeanValidator,它正在与Omnifaces org.omnifaces.taghandler.ValidateBea

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?

  • 在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre

  • 我们有一些消息需要保持序列。我们已经决定将所有消息从一个特定的源发送到一个分区,这样就可以维护消息序列(多个源可以产生到同一个分区,但一个源不能产生到多个分区),并且我们将能够用它们的密钥标识每个源。 现在,我们需要使用这些消息并进行一些处理。我们对已消费的消息执行多个独立操作(例如,将它们存储在数据库中,转发它们等)。现在,我一直在考虑是使用Kafka Streams API还是消费者API来实

  • 我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。

  • 我很难在Kafka主题的消费者中找到处理异常的简单模式。场景如下:在消费者中,我调用一个外部服务。如果服务不可用,我想重试几次,然后停止消费。 最简单的模式似乎是一种处理它的阻塞同步方式,在Java中如下所示: 但是,我觉得必须有一种更简单的方法(不使用第三方库),并且避免阻塞线程。 这似乎是我们想要的一种常见的东西,但我找不到一个简单的例子来说明这种模式。