我试图用这个版本来反对Cloudera安装:
Apache Kafka版本3.0.0-1.3.0.0.p0.40版本0.11.0+Kafka 3.0.0+50的Cloudera发行版
我的KafkaProducerConfig类非常简单:
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;
@Value("${spring.kafka.producer.client-id}")
private String producerClientId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ProducerFactory<String, Pdid> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(this.defaultTopicName);
return kafkaTemplate;
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}
[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
我已经从生产者应用程序端尝试了以下几点:
我希望我错过了一些明显的东西。是否有任何额外的头,我需要打开/关闭,以帮助解决魔术v0的问题,任何人都知道?
但它们是手工制作必要的Spring bean的较老的应用程序。
在org.apache.kafka.common.record.fileRecords。下转换(FileRecords.java:245)
我不熟悉kafka broker内部,但“听起来”这些主题是用一个旧的broker创建的,它们的格式不支持header,而不支持broker版本本身(提示:downConvert)。
jsonSerializer.add_type_info_headers设置为false。
这应该防止框架设置任何标题;您需要显示您的生产者代码(和所有配置)。
您还可以将ProducerInterceptor
添加到生产者配置中,并检查onsend()
方法中的ProducerRecord
headers
属性,以确定输出消息中的设置标头是什么。
我正在使用python向elasticsearch插入一些数据,elasticsearch版本是 从日期时间导入 日期时间 从 Elasticsearch 导入 Elasticsearch es = Elasticsearch( “localhost:9200” ) 从 Elasticsearch 导入 传输错误数据 = { “http_code” : “404”, “计数” : “10” } t
我正在使用此docker compose设置在本地设置Kafka:https://github.com/wurstmeister/kafka-docker/ < code>docker-compose up运行良好,通过shell创建主题运行良好。 现在我尝试通过连接到Kafka 启动Spring应用程序时,它会打印正确版本的Kafka: 我试着传达这样一个信息 客户端发送失败,原因是 在服务器控
问题: [org.springframework.kafka.KafkaListenerEndpointContainer#3-0-kafka-consumer-1]WARN o.a.k.c.consumer.internals公司。Fetcher-获取主题分区的数据时出现未知错误 环境设置: < li >Kafka版本:kafka_2.11-1.0.0 < li >制作者详细信息:< br> S
我发布了这个问题,我可以通过删除WebLogic startups参数来解决这个问题: 有人能解释一下吗?
我试图建立一个具有联合数据类型支持成员记录类型的AVRO复杂记录。 我在尝试读取这种模式时出错。 我想知道-是否有可能声明这样的AVRO模式-其中一个字段类型是复杂用户定义消息结构的联合。 如果可能的话,你能让我知道我做错了什么吗?或者用union类型字段的类型定义举例说明这种结构吗? 我想使用AVRO的动态架构用法 - 因此请指定此架构文件运行时并将传入缓冲区解析为“request”/“resp
将CloudFormation模板迁移到AWS SAM方法后,在部署使用创建的模板时,在CloudFormation中我得到错误消息 遇到不支持的属性CodeUri 在模板中包含的所有Lambda函数上。 经过调查,很明显,CodeUri属性没有从打包的模板中删除,AWS::Lambda::Function type不支持CodeUri属性,尽管适当的资源作为打包过程的一部分上载到了S3(因此打包