当前位置: 首页 > 知识库问答 >
问题:

org.apache.kafka.common.errors。序列化异常:未知的魔术字节

鲍驰
2023-03-14

我正在尝试使用镜头从MQTT到Kafka的消息。ioReactor。流式反应器的最新版本

Kafka/融合版

sh-4.4$ kafka-topics --version
7.1.0-ccs (Commit:c86722379ab997cc)
kafka-connect-mqtt-3.0.1-2.5.0-all.jar

预期行为:avro主题应打印在控制台上

org.apache.kafka.common.errors。序列化异常:未知的魔法字节!

附加细节

 kafka-connect:
    image: kafka-connect
    build:
      context: .
    hostname: kafka-connect
    container_name: kafka-connect
    depends_on:
      - zookeeper-1
      - kafka-broker-1
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: SSL://kafka-broker-1:19093
      CONNECT_GROUP_ID: 'kafka-connect'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_TOPIC: 'connect-config-storage'
      CONNECT_OFFSET_STORAGE_TOPIC: 'connect-offset-storage'
      CONNECT_STATUS_STORAGE_TOPIC: 'connect-status-storage'
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" 
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_PLUGIN_PATH: /etc/kafka/secrets/plugins
      CONNECT_SECURITY_PROTOCOL: 'SSL'
      CONNECT_SSL_KEY_PASSWORD: confluent
      CONNECT_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_SSL_TRUSTSTORE_PASSWORD: confluent
      CONNECT_KAFKASTORE_SECURITY_PROTOCOL: 'SSL'
      CONNECT_KAFKASTORE_SSL_KEY_PASSWORD: confluent
      CONNECT_KAFKASTORE_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_KAFKASTORE_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: confluent
      CONNECT_PRODUCER_SECURITY_PROTOCOL: 'SSL'
      CONNECT_PRODUCER_SSL_KEY_PASSWORD: confluent
      CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: confluent
      CONNECT_CONSUMER_SECURITY_PROTOCOL: 'SSL'
      CONNECT_CONSUMER_SSL_KEY_PASSWORD: confluent
      CONNECT_CONSUMER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
      CONNECT_CONSUMER_SSL_KEYSTORE_PASSWORD: confluent
      CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
      CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: confluent
    volumes:
      - ${KAFKA_SSL_SECRETS_DIR}/connects:/etc/kafka/secrets
    networks:
      - kafka-cluster-network

连接器属性配置(我的连接器属性)

 curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
          "name": "mqtt-source",
            "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
            "tasks.max": "1",
            "topics": "mqtt",
            "connect.mqtt.connection.clean": "true",
            "connect.mqtt.connection.timeout": "1000",
            "connect.mqtt.kcql": "INSERT INTO mqtt SELECT * FROM /ais",
            "connect.mqtt.connection.keep.alive": "1000",
            "connect.mqtt.source.converters": "/ais=com.datamountaineer.streamreactor.connect.converters.source.AvroConverter",
            "connect.source.converter.avro.schemas": "/ais=/etc/kafka/secrets/plugins/classAPositionReportSchema.json",
            "connect.mqtt.client.id": "dm_source_id",
            "connect.mqtt.converter.throw.on.error": "true",
            "connect.mqtt.hosts": "tcp://mqtt:1883",
            "connect.mqtt.service.quality": "1"
        }'  http://localhost:8083/connectors/mqtt-source/config | jq .

avro json文件

{
    "type": "record",
    "name": "aisClassAPositionReport",
    "namespace": "com.landoop.ais",
    "doc": "Schema for AIS Class A Position Reports.",
    "fields": [
      {
        "name": "Type",
        "type": "int",
        "doc": "The type of the AIS Message. 1/2/3 are Class A position reports."
      },
      {
        "name": "Repeat",
        "type":"int",
        "doc": "Repeat Indicator"
      },
      {
        "name": "MMSI",
        "type": "long",
        "doc": "User ID (MMSI)"
      },
      {
        "name": "Speed",
        "type": "float",
        "doc": "Speed over Ground (SOG)"
      },
      {
        "name": "Accuracy",
        "type": "boolean",
        "doc": "Position Accuracy"
      },
      {
        "name": "Longitude",
        "type": "double",
        "doc": "Longitude"
      },
      {
        "name": "Latitude",
        "type": "double",
        "doc": "Latitude"
      },
      {
        "name": "Course",
        "type": "float",
        "doc": "Course over Ground (COG)"
      },
      {
        "name": "Heading",
        "type": "int",
        "doc": "True Heading (HDG)"
      },
      {
        "name": "Second",
        "type": "int",
        "doc": "Time Stamp"
      },
      {
        "name": "RAIM",
        "type": "boolean",
        "doc": "RAIM flag"
      },
      {
        "name": "Radio",
        "type": "long",
        "doc": "Radio Status"
      },
      {
        "name": "Status",
        "type": "int",
        "doc": "Navigation Status (enumerated type)"
      },
      {
        "name": "Turn",
        "type": "float",
        "doc": "Rate of Turn (ROT)"
      },
      {
        "name": "Maneuver",
        "type": "int",
        "doc": "Manuever Indicator (enumerated type)"
      },
      {
        "name": "Timestamp",
        "type": "long",
        "doc": "Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering."
      }
    ]
  }

MQTT代理消息

mosquitto_pub \
    -m "{\"Type\": 384558914, \"Repeat\": 1429873353, \"MMSI\": 1421443607430111832, \"Speed\": 0.32155126, \"Accuracy\": true, \"Longitude\": 0.3627212439937161, \"Latitude\": 0.2725890739370421, \"Course\": 0.99500954, \"Heading\": -2064209033, \"Second\": -1096102271, \"RAIM\": true, \"Radio\": -189624595456590919, \"Status\": -139830130, \"Turn\": 0.035991907, \"Maneuver\": 1595359693, \"Timestamp\": -932628952948741103}" \
    -d -r -t /ais

完整日志

sh-4.4$ kafka-avro-console-consumer --bootstrap-server kafka-broker-1:19093                                       --topic mqtt                                       --from-beginning                                       --max-messages 10                                       --consumer.config /etc/kafka/secrets/host.consumer.ssl.config                                       --property schema.registry.url=http://0.0.0.0:8081
[2022-04-13 05:49:22,333] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-13 05:49:23,082] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka-broker-1:19093]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = console-consumer
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-27706
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = SSL
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm =
        ssl.engine.factory.class = null
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = /etc/kafka/secrets/kafka.consumer.keystore.jks
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = /etc/kafka/secrets/kafka.consumer.truststore.jks
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-04-13 05:49:23,269] INFO Kafka version: 7.1.0-ce (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka commitId: 5c05312ab63acecf (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka startTimeMs: 1649828963261 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,274] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Subscribed to topic(s): mqtt (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:24,085] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting the last seen epoch of partition mqtt-0 to 0 since the associated topicId changed from null to eLc9qW-WTemQ53DDH9JgzA (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,093] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Cluster ID: 7mB45_SgTXSxROWQruwrRQ (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,095] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Discovered group coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,099] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,167] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: need to re-join with the given member-id (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,168] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,174] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully joined group with generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,179] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Finished assignment for group at generation 1: {console-consumer-35015e12-2725-473a-b7b1-70cce478ed76=Assignment(partitions=[mqtt-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,202] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully synced group in generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,203] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Notifying assignor about the new Assignment(partitions=[mqtt-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,210] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Adding newly assigned partitions: mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,234] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Found no committed offset for partition mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,370] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Seeking to offset 1 for partition mqtt-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Revoke previously assigned partitions mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Member console-consumer-35015e12-2725-473a-b7b1-70cce478ed76 sending LeaveGroup request to coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,374] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,375] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,383] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,396] INFO App info kafka.consumer for console-consumer unregistered (org.apache.kafka.common.utils.AppInfoParser)
Processed a total of 1 messages
[2022-04-13 05:49:27,400] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:322)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:112)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:87)
        at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92)
        at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

还在模式注册表上注册了模式

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "{\"type\": \"record\",    \"name\": \"aisClassAPositionReport\",    \"namespace\": \"com.landoop.ais\",    \"doc\": \"Schema for AIS Class A Position Reports.\",    \"fields\": [{\"name\": \"Type\",\"type\": \"int\",\"doc\": \"The type of the AIS Message. 1/2/3 are Class A position reports.\"},{\"name\": \"Repeat\",\"type\":\"int\",\"doc\": \"Repeat Indicator\"},{\"name\": \"MMSI\",\"type\": \"long\",\"doc\": \"User ID (MMSI)\"},{\"name\": \"Speed\",\"type\": \"float\",\"doc\": \"Speed over Ground (SOG)\"},{\"name\": \"Accuracy\",\"type\": \"boolean\",\"doc\": \"Position Accuracy\"},{\"name\": \"Longitude\",\"type\": \"double\",\"doc\": \"Longitude\"},{\"name\": \"Latitude\",\"type\": \"double\",\"doc\": \"Latitude\"},{\"name\": \"Course\",\"type\": \"float\",\"doc\": \"Course over Ground (COG)\"},{\"name\": \"Heading\",\"type\": \"int\",\"doc\": \"True Heading (HDG)\"},{\"name\": \"Second\",\"type\": \"int\",\"doc\": \"Time Stamp\"},{\"name\": \"RAIM\",\"type\": \"boolean\",\"doc\": \"RAIM flag\"},{\"name\": \"Radio\",\"type\": \"long\",\"doc\": \"Radio Status\"},{\"name\": \"Status\",\"type\": \"int\",\"doc\": \"Navigation Status (enumerated type)\"},{\"name\": \"Turn\",\"type\": \"float\",\"doc\": \"Rate of Turn (ROT)\"},{\"name\": \"Maneuver\",\"type\": \"int\",\"doc\": \"Manuever Indicator (enumerated type)\"},{\"name\": \"Timestamp\",\"type\": \"long\",\"doc\": \"Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering.\"}]}"}' http://0.0.0.0:8081/subjects/mqtt-value/versions

我会做错什么?

共有1个答案

乌鸿宝
2023-03-14

根据CONNECT\u VALUE\u CONVERTER,您正在生成JSON,而不是Avro,因此您不能使用kafka Avro控制台消费者来读取JSON数据,并且您的Avro模式也没有被使用

我不确定如何连接.mqtt.source。转换器可以工作,但您尚未将其配置为使用任何注册表,这是Kafkaavro控制台消费者工作的必要条件

 类似资料:
  • 我一直在尝试将kafka-avro-console-consumer从Confluent连接到我们的遗留Kafka集群,该集群是在没有Confluent模式注册表的情况下部署的。我使用以下属性显式提供了模式: 但我得到的是‘未知的魔法字节!’出错 是否可以使用Confluent kafka-avro-console-consumer(未使用Confluent的AvroSerializer序列化)和

  • 主要内容:__LINE__,实例,__FILE__,实例,__DIR__,实例,__FUNCTION__,实例,__CLASS__,实例,__TRAIT__,实例,__METHOD__,实例,__NAMESPACE__,实例PHP 向它运行的任何脚本提供了大量的预定义常量。 不过很多常量都是由不同的扩展库定义的,只有在加载了这些扩展库时才会出现,或者动态加载后,或者在编译时已经包括进去了。 有八个魔术常量它们的值随着它们在代码中的位置改变而改变。 例如 __LINE__ 的值就依赖于它在脚本中所

  • 我有一个问题反序列化来自Kafka主题的消息。这些消息已经使用spring-cloud-stream和Apache Avro序列化。我正在用斯普林斯·Kafka阅读它们,并试图反序列化它们。如果我使用spring-cloud来生成和使用消息,那么我就可以很好地反序列化消息。问题是当我用Spring Kafka消费它们,然后试图反序列化。 我正在使用一个模式注册表(用于开发的spring-boot模

  • 试图在Java中使用protobuf反序列化消息,并得到以下异常。 原因:com.google.protobuf.InvalidProtocolBufferException:在解析协议消息时,输入意外地在字段中间结束。这可能意味着输入被截断,或者嵌入的消息错误报告了自己的长度。在com.google.protobuf.InvalidProtocolBufferException.Truncate

  • 假设我有一组a、B、C类: 公开A类:整数; 公共B类:整数;字符串地址; 公共类C:int orderNumber; 如何反序列化仅包含这些类但顺序未知的Json字符串(在Java中使用Gson)?例如: 非常感谢你!

  • 有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。