当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka Producer未发送到Kafka 1.0.0(Magic v1不支持记录头)

双恩
2023-03-14

我正在使用此docker compose设置在本地设置Kafka:https://github.com/wurstmeister/kafka-docker/

< code>docker-compose up运行良好,通过shell创建主题运行良好。

现在我尝试通过sping-kafka:2.1.0.RELEASE连接到Kafka

启动Spring应用程序时,它会打印正确版本的Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

我试着传达这样一个信息

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

客户端发送失败,原因是

UnknownServerException: The server experienced an unexpected error when processing the request

在服务器控制台中,我收到消息Magic v1不支持记录头

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

谷歌提示存在版本冲突,但版本似乎合适(org.apache.kafka:kafka-clients:1.0.0位于类路径中)。

有什么线索吗?谢谢!

编辑:我缩小了问题的来源。发送普通字符串有效,但通过JsonSerializer发送Json会导致给定的问题。这是我的生产者配置的内容:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

共有3个答案

毋宏茂
2023-03-14

我刚刚对docker图像进行了测试,没有任何问题...

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application {

    public static void main(String[] args) {
        SpringApplication.run(So47953901Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> template.send("foo", "bar", "baz");
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

编辑

仍然为我工作...

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz
冉永宁
2023-03-14

已解决。问题既不是代理,也不是某些docker缓存,也不是Spring应用程序。

问题是我并行用于调试的控制台使用者。这是一个“老”消费者,从 kafka-console-consumer.sh 开始 --主题=主题 --动物园管理员=...

它实际上在启动时会打印一个警告:对旧消费者使用ConsoleConsumer已被弃用,并将在未来的主要版本中删除。考虑通过传递[bootstrap server]而不是[zookeeper]来使用新消费者。

应使用带有< code> - bootstrap-server选项的“新”消费者(尤其是在使用Kafka 1.0和JsonSerializer时)。注意:这里用一个老消费者确实能影响到生产者。

司空高义
2023-03-14

我也有类似的问题。如果我们对值使用JsonSerializerJsonSerde

如果您可以使用默认的json序列化,那么使用以下方法(这里的关键点是ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

但是,如果您需要一个带有特定< code>ObjectMapper的自定义< code>JsonSerializer(如< code>PropertyNamingStrategy。SNAKE_CASE),您应该禁用在< code>JsonSerializer上显式添加信息头,因为spring kafka忽略< code > DefaultKafkaProducerFactory 的属性< code > ADD _ TYPE _ INFO _ HEADERS (对我来说,这是spring kafka的一个糟糕设计)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

或者如果我们使用< code>JsonSerde,那么:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
 类似资料:
  • 我对Kafka是完全陌生的,我在使用Kafka制作人时遇到了一些麻烦。生成器的send方法恰好阻塞1min,然后应用程序无一例外地继续。这显然是一些超时,但没有抛出异常。 我在原木上也看不出什么真正的东西。 kafka.properties文件中的属性: 因此,producer.send阻塞1分钟,然后继续。在结尾,Kafka没有储存任何东西,但新的话题被创造出来了。谢谢你的帮助!

  • 我发布了这个问题,我可以通过删除WebLogic startups参数来解决这个问题: 有人能解释一下吗?

  • 下面的代码有时失败,有时工作。我使用的是Java8。是服务器端的问题吗? 线程“main”javax.net.ssl.sslException中的异常:不支持的记录版本未知-0.0。

  • 我试图用这个版本来反对Cloudera安装: 我的KafkaProducerConfig类非常简单: 我已经从生产者应用程序端尝试了以下几点: 降级到Spring Kafka 2.0.4。我希望通过Kafka的0.11.0版本来解决这个问题,但没有效果。 验证节点都是同一个版本。根据我的管理员,他们是。 与我的管理员验证,我们没有混合安装。我再次被告知我们没有。 基于类似的堆栈溢出问题,我返回到2

  • 我试图连接到mysql与JDBC。我在Windows 10上生成的密钥如下: 我的JDBC代码如下所示: 我得到以下错误: 原因:javax。网ssl。SSLexException:sun上不支持的记录版本未知-0.0。安全ssl。InputRecord。在sun上检查RecordVersion(InputRecord.java:552)。安全ssl。InputRecord。sun上的readV3