我正在尝试使用镜头从MQTT到Kafka的消息。ioReactor。流式反应器的最新版本
Kafka/融合版
sh-4.4$ kafka-topics --version
7.1.0-ccs (Commit:c86722379ab997cc)
kafka-connect-mqtt-3.0.1-2.5.0-all.jar
预期行为:avro主题应打印在控制台上
org.apache.kafka.common.errors。序列化异常:未知的魔法字节!
附加细节
kafka-connect:
image: kafka-connect
build:
context: .
hostname: kafka-connect
container_name: kafka-connect
depends_on:
- zookeeper-1
- kafka-broker-1
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: SSL://kafka-broker-1:19093
CONNECT_GROUP_ID: 'kafka-connect'
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_TOPIC: 'connect-config-storage'
CONNECT_OFFSET_STORAGE_TOPIC: 'connect-offset-storage'
CONNECT_STATUS_STORAGE_TOPIC: 'connect-status-storage'
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_PLUGIN_PATH: /etc/kafka/secrets/plugins
CONNECT_SECURITY_PROTOCOL: 'SSL'
CONNECT_SSL_KEY_PASSWORD: confluent
CONNECT_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_KAFKASTORE_SECURITY_PROTOCOL: 'SSL'
CONNECT_KAFKASTORE_SSL_KEY_PASSWORD: confluent
CONNECT_KAFKASTORE_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_KAFKASTORE_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_PRODUCER_SECURITY_PROTOCOL: 'SSL'
CONNECT_PRODUCER_SSL_KEY_PASSWORD: confluent
CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_CONSUMER_SECURITY_PROTOCOL: 'SSL'
CONNECT_CONSUMER_SSL_KEY_PASSWORD: confluent
CONNECT_CONSUMER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_CONSUMER_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: confluent
volumes:
- ${KAFKA_SSL_SECRETS_DIR}/connects:/etc/kafka/secrets
networks:
- kafka-cluster-network
连接器属性配置(我的连接器属性)
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"name": "mqtt-source",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"tasks.max": "1",
"topics": "mqtt",
"connect.mqtt.connection.clean": "true",
"connect.mqtt.connection.timeout": "1000",
"connect.mqtt.kcql": "INSERT INTO mqtt SELECT * FROM /ais",
"connect.mqtt.connection.keep.alive": "1000",
"connect.mqtt.source.converters": "/ais=com.datamountaineer.streamreactor.connect.converters.source.AvroConverter",
"connect.source.converter.avro.schemas": "/ais=/etc/kafka/secrets/plugins/classAPositionReportSchema.json",
"connect.mqtt.client.id": "dm_source_id",
"connect.mqtt.converter.throw.on.error": "true",
"connect.mqtt.hosts": "tcp://mqtt:1883",
"connect.mqtt.service.quality": "1"
}' http://localhost:8083/connectors/mqtt-source/config | jq .
avro json文件
{
"type": "record",
"name": "aisClassAPositionReport",
"namespace": "com.landoop.ais",
"doc": "Schema for AIS Class A Position Reports.",
"fields": [
{
"name": "Type",
"type": "int",
"doc": "The type of the AIS Message. 1/2/3 are Class A position reports."
},
{
"name": "Repeat",
"type":"int",
"doc": "Repeat Indicator"
},
{
"name": "MMSI",
"type": "long",
"doc": "User ID (MMSI)"
},
{
"name": "Speed",
"type": "float",
"doc": "Speed over Ground (SOG)"
},
{
"name": "Accuracy",
"type": "boolean",
"doc": "Position Accuracy"
},
{
"name": "Longitude",
"type": "double",
"doc": "Longitude"
},
{
"name": "Latitude",
"type": "double",
"doc": "Latitude"
},
{
"name": "Course",
"type": "float",
"doc": "Course over Ground (COG)"
},
{
"name": "Heading",
"type": "int",
"doc": "True Heading (HDG)"
},
{
"name": "Second",
"type": "int",
"doc": "Time Stamp"
},
{
"name": "RAIM",
"type": "boolean",
"doc": "RAIM flag"
},
{
"name": "Radio",
"type": "long",
"doc": "Radio Status"
},
{
"name": "Status",
"type": "int",
"doc": "Navigation Status (enumerated type)"
},
{
"name": "Turn",
"type": "float",
"doc": "Rate of Turn (ROT)"
},
{
"name": "Maneuver",
"type": "int",
"doc": "Manuever Indicator (enumerated type)"
},
{
"name": "Timestamp",
"type": "long",
"doc": "Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering."
}
]
}
MQTT代理消息
mosquitto_pub \
-m "{\"Type\": 384558914, \"Repeat\": 1429873353, \"MMSI\": 1421443607430111832, \"Speed\": 0.32155126, \"Accuracy\": true, \"Longitude\": 0.3627212439937161, \"Latitude\": 0.2725890739370421, \"Course\": 0.99500954, \"Heading\": -2064209033, \"Second\": -1096102271, \"RAIM\": true, \"Radio\": -189624595456590919, \"Status\": -139830130, \"Turn\": 0.035991907, \"Maneuver\": 1595359693, \"Timestamp\": -932628952948741103}" \
-d -r -t /ais
完整日志
sh-4.4$ kafka-avro-console-consumer --bootstrap-server kafka-broker-1:19093 --topic mqtt --from-beginning --max-messages 10 --consumer.config /etc/kafka/secrets/host.consumer.ssl.config --property schema.registry.url=http://0.0.0.0:8081
[2022-04-13 05:49:22,333] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-13 05:49:23,082] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-broker-1:19093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = console-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-27706
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm =
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /etc/kafka/secrets/kafka.consumer.keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /etc/kafka/secrets/kafka.consumer.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-04-13 05:49:23,269] INFO Kafka version: 7.1.0-ce (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka commitId: 5c05312ab63acecf (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka startTimeMs: 1649828963261 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,274] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Subscribed to topic(s): mqtt (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:24,085] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting the last seen epoch of partition mqtt-0 to 0 since the associated topicId changed from null to eLc9qW-WTemQ53DDH9JgzA (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,093] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Cluster ID: 7mB45_SgTXSxROWQruwrRQ (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,095] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Discovered group coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,099] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,167] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: need to re-join with the given member-id (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,168] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,174] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully joined group with generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,179] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Finished assignment for group at generation 1: {console-consumer-35015e12-2725-473a-b7b1-70cce478ed76=Assignment(partitions=[mqtt-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,202] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully synced group in generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,203] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Notifying assignor about the new Assignment(partitions=[mqtt-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,210] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Adding newly assigned partitions: mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,234] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Found no committed offset for partition mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,370] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Seeking to offset 1 for partition mqtt-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Revoke previously assigned partitions mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Member console-consumer-35015e12-2725-473a-b7b1-70cce478ed76 sending LeaveGroup request to coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,374] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,375] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,383] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,396] INFO App info kafka.consumer for console-consumer unregistered (org.apache.kafka.common.utils.AppInfoParser)
Processed a total of 1 messages
[2022-04-13 05:49:27,400] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:322)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:112)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:87)
at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
还在模式注册表上注册了模式
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "{\"type\": \"record\", \"name\": \"aisClassAPositionReport\", \"namespace\": \"com.landoop.ais\", \"doc\": \"Schema for AIS Class A Position Reports.\", \"fields\": [{\"name\": \"Type\",\"type\": \"int\",\"doc\": \"The type of the AIS Message. 1/2/3 are Class A position reports.\"},{\"name\": \"Repeat\",\"type\":\"int\",\"doc\": \"Repeat Indicator\"},{\"name\": \"MMSI\",\"type\": \"long\",\"doc\": \"User ID (MMSI)\"},{\"name\": \"Speed\",\"type\": \"float\",\"doc\": \"Speed over Ground (SOG)\"},{\"name\": \"Accuracy\",\"type\": \"boolean\",\"doc\": \"Position Accuracy\"},{\"name\": \"Longitude\",\"type\": \"double\",\"doc\": \"Longitude\"},{\"name\": \"Latitude\",\"type\": \"double\",\"doc\": \"Latitude\"},{\"name\": \"Course\",\"type\": \"float\",\"doc\": \"Course over Ground (COG)\"},{\"name\": \"Heading\",\"type\": \"int\",\"doc\": \"True Heading (HDG)\"},{\"name\": \"Second\",\"type\": \"int\",\"doc\": \"Time Stamp\"},{\"name\": \"RAIM\",\"type\": \"boolean\",\"doc\": \"RAIM flag\"},{\"name\": \"Radio\",\"type\": \"long\",\"doc\": \"Radio Status\"},{\"name\": \"Status\",\"type\": \"int\",\"doc\": \"Navigation Status (enumerated type)\"},{\"name\": \"Turn\",\"type\": \"float\",\"doc\": \"Rate of Turn (ROT)\"},{\"name\": \"Maneuver\",\"type\": \"int\",\"doc\": \"Manuever Indicator (enumerated type)\"},{\"name\": \"Timestamp\",\"type\": \"long\",\"doc\": \"Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering.\"}]}"}' http://0.0.0.0:8081/subjects/mqtt-value/versions
我会做错什么?
根据CONNECT\u VALUE\u CONVERTER,您正在生成JSON,而不是Avro,因此您不能使用kafka Avro控制台消费者来读取JSON数据,并且您的Avro模式也没有被使用
我不确定如何连接.mqtt.source。转换器可以工作,但您尚未将其配置为使用任何注册表,这是Kafkaavro控制台消费者工作的必要条件
我一直在尝试将kafka-avro-console-consumer从Confluent连接到我们的遗留Kafka集群,该集群是在没有Confluent模式注册表的情况下部署的。我使用以下属性显式提供了模式: 但我得到的是‘未知的魔法字节!’出错 是否可以使用Confluent kafka-avro-console-consumer(未使用Confluent的AvroSerializer序列化)和
主要内容:__LINE__,实例,__FILE__,实例,__DIR__,实例,__FUNCTION__,实例,__CLASS__,实例,__TRAIT__,实例,__METHOD__,实例,__NAMESPACE__,实例PHP 向它运行的任何脚本提供了大量的预定义常量。 不过很多常量都是由不同的扩展库定义的,只有在加载了这些扩展库时才会出现,或者动态加载后,或者在编译时已经包括进去了。 有八个魔术常量它们的值随着它们在代码中的位置改变而改变。 例如 __LINE__ 的值就依赖于它在脚本中所
我有一个问题反序列化来自Kafka主题的消息。这些消息已经使用spring-cloud-stream和Apache Avro序列化。我正在用斯普林斯·Kafka阅读它们,并试图反序列化它们。如果我使用spring-cloud来生成和使用消息,那么我就可以很好地反序列化消息。问题是当我用Spring Kafka消费它们,然后试图反序列化。 我正在使用一个模式注册表(用于开发的spring-boot模
试图在Java中使用protobuf反序列化消息,并得到以下异常。 原因:com.google.protobuf.InvalidProtocolBufferException:在解析协议消息时,输入意外地在字段中间结束。这可能意味着输入被截断,或者嵌入的消息错误报告了自己的长度。在com.google.protobuf.InvalidProtocolBufferException.Truncate
假设我有一组a、B、C类: 公开A类:整数; 公共B类:整数;字符串地址; 公共类C:int orderNumber; 如何反序列化仅包含这些类但顺序未知的Json字符串(在Java中使用Gson)?例如: 非常感谢你!
有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。