我有两个服务应该通过Kafka
进行通信。让我们调用第一个服务WriteService和第二个服务QueryService。
在WriteService方面,我为生产者提供了以下配置。
@Configuration
public class KafkaProducerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.groupid}")
private String serviceGroupId;
@Value("${spring.kafka.consumer.trusted-packages}")
private String trustedPackage;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
侦听器有以下定义。有效负载类具有完全限定名-com.example.project.clientQueryView.module.routes.messaging.kafka.routedto
@KafkaListener(topics = "${spring.kafka.topics.routes}",
containerFactory = "kafkaListenerContainerFactory")
public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
@Payload RouteDto payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
当发送消息时,我得到以下错误
由:org.springframework.messaging.converter.MessageConversionException引起:无法解析类名。类未找到[com.example.project.web.routes.dto.routedto];嵌套异常是java.lang.ClassNotFoundException:com.example.project.web.routes.dto.routedTo
如何在不手动配置映射器的情况下解决这个问题?
如果您使用的是Spring-Kafka-2.2.x,则可以通过JSONDeserializer
文档的重载构造函数禁用默认头
从版本2.2开始,您可以显式地配置反序列化器,使用提供的目标类型,并通过使用一个具有布尔useHeadersIfPresent(默认为true)的重载构造函数忽略标头中的类型信息。下面的示例演示如何这样做:
DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
如果使用较低版本,则使用MessageConverter
(您可能会在Spring-Kafka-2.1.x及更高版本中看到此问题)
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(RouteDto dto) {
...
}
注意:只有在方法级别声明@Kafkalistener注释时,才能实现这种类型推断。对于类级别的@KafKalistener,有效负载类型用于选择调用哪个@KafKaHandler方法,因此在选择该方法之前必须已经转换过它。
当我通过命令行或Jenkins运行Groovy脚本时,会出现“无法解析类”错误。 我在C中的同一文件夹中有以下两个groovy文件:\Users\myuser\git\productname\mycompany build\src\main\groovy\com\mycompany\build 富。groovy公司 酒吧groovy公司 我经营Foo。groovy使用命令行。 运行Groovy时,
问题内容: 这是我得到的错误: 这是我的代码: 另外,我在一个单独的类中声明了这一点: 是什么导致此错误,我该如何解决? 编辑:缩进,空格,命名约定和可读性问题已得到解决。 问题答案: 问题在于,这似乎是一个内部类(很难说,您的缩进是不好的),并且由于它不是静态的,因此您无法单独实例化它。 您要么需要自己创建一个类,要么使其成为一个类并将其实例化为(或者无论父类是什么,您的缩进在这里实际上都没有帮
问题内容: 尝试遵循本教程:教程 尝试像这样使用 JSONParser : 但是Eclipse给了我。 该怎么办? 问题答案: JSONParser类在本教程的下方,它看起来像这样…… 我会考虑看看杰克逊图书馆通过http://jackson.codehaus.org/
问题内容: 我从git下载了Android的Browser项目并将其导入到Eclipse。但是由于以下错误,我无法构建它: 无法解析类型java.lang.Enum。从所需的.class文件间接引用它 现在…这是一个开发环境,对于其他项目来说效果很好: Windows 7 Ultimate 64位。 JDK 64位(已安装jdk-6u23-windows-x64.exe。) Eclipse Cla
我刚开始使用Android Studio一周,它对我来说非常好,但当我今天开始使用Android Studio时,我得到了一个错误:“错误:重复类:mypackage”。R’。我以前在使用Eclipse时看到过这个错误,所以我尝试过几次重建项目并重新启动Android Studio,但都没有帮助。 在阅读了一些Stackoverflow问题后,我尝试删除R.java并再次重建,现在我在重建时没有收
我正在尝试将主题添加到我的项目中。它需要appcompat v7。 我按照Android开发者网站中提到的步骤添加了支持库。经过大量搜索,我知道支持库v4将与支持库v7冲突。 所以我从我的项目中删除了android库v4。因此,现在我得到了t错误()。 我必须做什么?请有人帮我解决这个问题。预先感谢
在我的testrunner类中,当使用时,我收到一条错误消息,说“cucumberoptions不能解析为类型”。我使用的是最新版本的eclipse cucumber依赖项。通过安装和卸载eclipse,通过降低cucumber依赖项版本,我已经尝试了所有可能的方法。 我的跑者班是:
我创建了一个RunTest类来使用Cucumber和JUnit运行测试场景。要运行之前的测试,我需要将RunWith类(@RunWith)导入到我的项目中,并将Cucumber.class作为参数传递。然后,类的RunWith参数识别更多传递给它的参数,不。eclipse将显示以下消息: 无法将此行类中的多个标记解析为类型。-Cucumber无法解析为类型。-注释@runwith必须定义属性值 我