当前位置: 首页 > 知识库问答 >
问题:

有汇流的SpringCould SteamKafka与有汇流的SpringKafka不产生相同的信息

朱宇航
2023-03-14

我希望将Spring Cloud Stream Kafka用于我的Java/Spring服务,并且我需要生成汇流序列化消息,因为我有使用汇流API来使用我的消息的.NET和NodeJS客户端。

就我们所见,使用汇流序列化器的Spring Kafka正在为我们工作,而使用汇流序列化器的Spring Cloud Stream Kafka正在给我们带来问题。

为了演示这两种情况下的区别,我在GitHub上创建了两个示例存储库,其中只包含在这两种情况下生成简单消息所需的代码。

>

  • 使用Spring Kakfa和Confluent https://github.com/donalthurley/SpringKafkaavro

    使用Spring Cloud Stream Kafka和Confluent https://github.com/donalthurley/SpringCloudKafkaavro

    我认为我已经为Spring Cloud应用程序正确地配置了带有UseNativeEncode标志的配置设置和汇合序列化器配置,这些配置可以在源代码中看到,这里有https://github.com/donalthurley/springCloudKafKaavro/blob/master/src/main/resources/application.yaml#l8

          kafka:
            binder:
              useNativeEncoding: true
              brokers: 127.0.0.1:9092
            bindings:
              output:
                producer:
                  configuration:
                    schema.registry.url: http://127.0.0.1:8081
                    key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                    value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    

    日志显示,我从Spring Kafka应用程序和Spring Cloud Stream Kafka应用程序发送了相同的简单消息。

    Producing Kafka person event: {"lastName": "Doe", "firstName": "John"}
    

    当我从我的docker Kafka环境中使用Kafka Topics UI浏览器时,请参阅https://hub.docker.com/r/landoop/fast-data-dev/,并查看消息raw data。

    当浏览器识别并显示消息值内的字段时,Spring Kafka看起来更正确。

    [
      {
        "topic": "test_spring_kafka",
        "key": "3197449393600061094",
        "value": {
          "lastName": "Doe",
          "firstName": "John"
        },
        "partition": 0,
        "offset": 0
      }
    ]
    

    在Spring Cloud流Kafka原始数据中,浏览器无法识别字段,这表明消息不是相同的。

    [
      {
        "topic": "test_spring_cloud_kafka",
        "key": "-6214497758709596999",
        "value": "\u0006Doe\bJohn",
        "partition": 0,
        "offset": 0
      }
    ]
    

    我认为使用Spring Cloud Stream Kafka生成汇合消息可能存在问题,而且Spring Kafka实现正确地生成了这些消息,但也许我的实现中缺少了什么,有人可以帮助我解决这个问题?

  • 共有1个答案

    澹台承载
    2023-03-14

    问题在于配置usenativeencode的方式。它没有生效。此配置应起作用:

    spring:
      application:
        name: springCloudKafkaAvro
      cloud:
        stream:
          schemaRegistryClient:
            endpoint: http://127.0.0.1:8081
          kafka:
            binder:
                brokers: 127.0.0.1:9092
            bindings:
              output:
                producer:
                  configuration:
                    schema.registry.url: http://127.0.0.1:8081
                    key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                    value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          bindings:
            output:
              destination: test_spring_cloud_kafka
              producer:
                useNativeEncoding: true
    
    

    请注意usenativeencode是如何从原始配置中重新排列的。

     类似资料:
    • 例子 #include <stdio.h> int main(void) { int i; for (i = 0; i < 10; i++) printf("%d ", i); putchar ('\n'); return 0; } 技巧 使用-fverbose-asm选项就可以生成带有详细信息的汇编文件: $ gcc -S -fverbose-asm foo.c

    • 这可能吗?我所做的更改需要在服务器上被识别,以便更改不仅仅发生在我的计算机上。此外,我甚至不知道是否可以在不下载外部插件的情况下查看实际的源代码(由于某些原因,外部插件无法工作)。有人做过这样的事吗?

    • 我已经将Postgres连接器映射到Kafka Connect中(通过Compose中的),并且在创建新的源连接器时可以在CCC中看到它。 当我创建源连接器时,我可以看到日志消息,指示此连接器的主题已创建。我在CCC的Connect专区也看到了这个话题。我还可以看到Connect能够通过该连接器对Postgres进行身份验证。 当我对连接器中指定的表进行更改时,我看到Kafka(我有一个3的集群)

    • 问题内容: 我正在运行一个非常简单的实验,目的是转换列数组,在此示例中为[“ a”]: 这给了我: 显然,可以自己做: 发生这种错误的原因可能是什么?如何纠正? 问题答案: 那是因为您提供的是而不是in 。根据文档: 标量字符串或整数应在转换器期望X像一维数组(矢量)的情况下使用,否则会将二维数组传递给转换器。 现在,需要一个字符串迭代器作为输入(因此是一维字符串数组)。但是,由于您要以的形式发送

    • 我有一张班友名单 我想分组名单的foo根据他们的位置和总和,我如何实现它与Java流? 我所取得的成就: 我的代码:

    • CATALINA_OPTS=“-dcom.sun.management.jmxremote-dcom.sun.management.jmxremote.port=6969-dcom.sun.management.jmxremote.ssl=false-dcom.sun.management.jmxremote.authenticate=false${CATALINA_OPTS}” 并使用jcons