{
"name": "restored-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"value.converter.schemas.enable": "true",
"name": "restored-exchange-rate-log",
"topic": "restored-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "restored-exchange-rate-log",
"task": 0
}
],
"type": "source"
}
这里是源连接器状态的输出:
{
"name": "restored-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "source"
}
这里是接收器连接器配置的输出:
{
"name": "bkp-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"topics": "exchange-rate-log",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"value.converter.schemas.enable": "true",
"name": "bkp-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "bkp-exchange-rate-log",
"task": 0
}
],
"type": "sink"
}
这里是接收器连接器状态的输出:
{
"name": "bkp-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "sink"
}
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"topics": "exchange-rate-log",
"format": "binary",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "bkp-exchange-rate-log"
}
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"format": "binary",
"topic": "bin-test-exchange-rate-log",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "restore-exchange-rate-log"
}
Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
我能够使用kafka-avro-console-consumer生成主题的“转储”。我们使用的是SSL+模式注册表。
下面是能够生成主题转储的命令行:
tpc=exchange-rate-log
SCHEMA_REGISTRY_OPTS="-Djavax.net.ssl.keyStore=. -Djavax.net.ssl.trustStore=. -Djavax.net.ssl.keyStorePassword=. -Djavax.net.ssl.trustStorePassword=." \
kafka-avro-console-consumer \
--from-beginning --bootstrap-server $CONNECT_BOOTSTRAP_SERVERS \
--property schema.registry.url=$CONNECT_SCHEMA_REGISTRY_URL \
--topic $tpc --consumer-property security.protocol=SSL \
--consumer-property ssl.truststore.location=/etc/ssl/kafkaproducer.truststore.jks \
--consumer-property ssl.truststore.password=$MYPASS \
--consumer-property ssl.keystore.location=/etc/ssl/kafkaproducer.keystore.jks \
--consumer-property ssl.keystore.password=$MYPASS \
--consumer-property ssl.key.password=$MYPASS \
--property "key.separator=::-::" \
--property "schema.id.separator=::_::" \
--property print.schema.ids=true \
--timeout-ms 15000 \
--property "print.key=true" \
--key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" > $tpc.dump
但是我没有找到如何使用kafka-avro-console-producer将它导入回来的方法,因为它不能用于非avro密钥。使用这个转储文件,我可以编写一个python生成器来读取该文件并恢复主题。
我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它
我已经在本地安装了kafka(目前没有集群/模式注册),并尝试生成一个Avro主题,下面是与该主题相关的模式。 我想创建一个简单的来根据上述模式创建一些数据并将其发布到kafka。考虑创建转换为的示例数据,然后将其更改为然后发布。 然后,如下所示: 现在可以< code >手动将模式附加到此avro主题。 这可以通过使用而不是使用来实现吗?这是用于批处理,而不是。
我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码
从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种
我只是在探索,目前我正在使用One
我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R