我正在使用Kafka活页夹的Spring Cloud Stream。它工作得很好,但客户端接收到重复的消息。已经尝试了所有Kafka消费属性,但没有结果。
在我的应用程序示例中检查2个类-Aggregate Application和EventFilterApplication。如果我运行EventFilterApplication-只有1条消息,如果是Aggregate Application-2条相同的消息。
下面是我的代码:
1) 聚合器
import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
@SpringBootApplication
public class AggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
.from(EventFilterApplication.class)
.run(args);
}
}
2) 事件过滤器应用程序
@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {
@Autowired
LiveProcessor source;
@StreamListener(LiveProcessor.INPUT)
public void handle(byte[] event) {
try {
System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));
} catch (Exception e) {
System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
}
}
public static void main(String[] args) {
SpringApplication.run(EventFilterApplication.class, args);
}
interface LiveProcessor extends Source {
String INPUT = "liveSource";
@Input(INPUT)
SubscribableChannel input();
}
}
3) 应用程序。yml公司
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka-broker.example.com:9092
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: kafka-zookeeper.example.com
type: kafka
bindings:
liveSource:
binder: kafka
consumer:
headerMode: raw
autoCommitOffset: true
destination: topic_example_name
build.gradle
buildscript {
ext { springBootVersion = '1.4.2.RELEASE' }
repositories {
jcenter()
maven { url 'http://repo.spring.io/plugins-release' }
}
dependencies {
classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
}
}
ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'
subprojects {
apply plugin: 'java'
apply plugin: 'propdeps'
apply plugin: 'propdeps-idea'
apply plugin: "io.spring.dependency-management"
sourceCompatibility = 1.8
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
}
}
dependencies {
compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
exclude module: "spring-boot-starter-tomcat"
exclude group: 'log4j'
}
compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")
compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
exclude group: "org.slf4j"
}
compile("org.springframework.cloud:spring-cloud-stream:")
compile("org.springframework.cloud:spring-cloud-starter-sleuth")
compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")
testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
exclude group: "org.slf4j"
}
}
}
复制是由EventFilterApplication
作为父根引起的:
public class AggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
.from(EventFilterApplication.class)
.run(args);
}
}
这很可能会创建两个订阅。您可以简单地执行以下操作,而不是将EventFilterApplication添加为root用户:
public class AggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(args)
.from(EventFilterApplication.class)
// rest of the pipeline
.run(args);
}
}
如果不需要创建聚合,这应该足够了:
public static void main(String[] args) {
SpringApplication.run(EventFilterApplication.class, args);
}
编辑:添加了一个额外的例子和澄清的答案。
我对JSF2.2、Omnifaces 2.6.3和PrimeFaces 6.1的客户端验证有一个问题,这导致来自Bean验证的验证消息显示两次。 如果在我使用的某个按钮中 验证有效,但每次验证都会产生两条消息: 我将问题追溯到类javax.faces.validator.BeanValidator,它正在与Omnifaces org.omnifaces.taghandler.ValidateBea
使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?
在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre
我们有一些消息需要保持序列。我们已经决定将所有消息从一个特定的源发送到一个分区,这样就可以维护消息序列(多个源可以产生到同一个分区,但一个源不能产生到多个分区),并且我们将能够用它们的密钥标识每个源。 现在,我们需要使用这些消息并进行一些处理。我们对已消费的消息执行多个独立操作(例如,将它们存储在数据库中,转发它们等)。现在,我一直在考虑是使用Kafka Streams API还是消费者API来实
我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。
本文向大家介绍Kafka中的消息是否会丢失和重复消费?相关面试题,主要包含被问及Kafka中的消息是否会丢失和重复消费?时的应答技巧和注意事项,需要的朋友参考一下 要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。 1、消息发送 0---表示不进行消息接收是否成功的确认; 1---表示当Leader接收成功时确认; -1---表示Leader和Follower都接收成功