我对Kafka很陌生。我想在我的spring项目中使用kafka生产者/消费者来替换activeMQ(jms)。我需要的是一个Kafka生产者将我的消息对象发布到一个主题,一个消费者从该主题订阅它。
首先是我的自定义编码器,解码器也是这样(对于我的消息类ConfigurationActionMsg):
@Component
public class ConfigActionMessageEncoder implements Encoder<ConfigurationActionMsg> {
public ConfigActionMessageEncoder() {
/* This constructor must be present for successful compile. */
}
public ConfigActionMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(ConfigurationActionMsg actionMsg){
return SerializationUtils.serialize(actionMsg);
}}
下面是我对处理器和消费者的配置
@Configuration
@ComponentScan(basePackages = {"XXX"})
public class KafkaConfig {
@Bean
public KafkaProducer<String,ConfigurationActionMsg> kafkaProducer(){
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.atlas.configengine2.XXX.ConfigActionMessageEncoder");
return new KafkaProducer<>(props);
}
@Bean
public KafkaConsumer<String, ConfigurationActionMsg> kafkaConsumer(){
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("bootstrap.servers", "localhost:9092");
//We should only have one process running for consumer
props.put("group.id", "resolverActionTrigger");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.atlas.configengine2.XXX.ConfigActionMessageDecoder");
KafkaConsumer<String, ConfigurationActionMsg> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("configAction"));
return consumer;
}}
我不确定这是否是实例化生产者/消费者的适当方法。但这种方式行不通。因为我的kafkaProducer无法实例化。
一些调试信息:
您应该实现org.apache.kafka.common.serialization.serializer
而不是encoder
。
请参阅CustomSerializer示例。
假设我们有一个typeclass。使用将允许我们克隆一个数据类型,然后通过语言扩展自动派生实例(请参见如何编写可派生类?以及使用相同的内部表示和最小的样板处理多个类型?)。 问题:是否可以让ghc自动派生和,但在派生时使用我们自己指定的实现? 例如,以下代码(其中=,=,=)不能按预期工作: 我期望/想要的是在派生时调用的实例。 显然,以下程序可以工作,但它需要为显式实例化: 然后ghc抱怨: 奇
我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息
我有一个kafka streams应用程序 或 这是一个类,用于将消息分发到不同的分区,即使在kafka 2.4版本中使用相同的键 RoundRobinPartitioner具有以下实现: 我的分区器由完全相同的代码组成,但分区方法实现不同,我的代码块是: 当我这样配置时,消息在两种实现中都被分发到不同的分区,但决不使用某些分区。 我有50个分区,而分区14和34从未收到消息。我的分区不是没有价值
我有一个主题中的多个事件,我试图在这些步骤中处理: 根据标题值过滤事件 应用反序列化程序 按键分组 聚合以生成新的KTable 新KTable将以流式传输方式传输到与具有新标题的新事件相同的主题 我可以使用transformValues访问标题,但不确定在执行toStream时如何注入新的标题值。 注意:我是KStream的新手。
主要内容:KafkaProducer API,生产者API,配置设置,SimpleProducer应用程序,简单的消费者实例,SimpleConsumer应用程序在这一节中将创建一个使用Java客户端发布和使用消息的应用程序。 Kafka生产者客户端由以下API组成。 KafkaProducer API 下面来了解Kafka生产者API。 KafkaProducer API的核心部分是类。 类提供了一个选项,用于将Kafka代理的构造函数与以下方法连接起来。 类提供方法来异步发送消息到主题。 的
本文向大家介绍Springboot 自定义校验代码实例,包括了Springboot 自定义校验代码实例的使用技巧和注意事项,需要的朋友参考一下 这篇文章主要介绍了Springboot 自定义校验代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 StartWithValidation.class StartWithValidator.clas