当前位置: 首页 > 知识库问答 >
问题:

通过Kafka连接器保存的Kafka主题未正确保存的消息

朱天逸
2023-03-14

所以我设置了一个汇流Kafka JDBC连接器。首先,我启动一个模式注册表,如

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

这是schema-registery.properties文件

listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false

接下来,我启动一个像这样的独立连接器

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties

connect-avro-standalone.properties是

bootstrap.servers=kafkahost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java

jdbc-source.properties是

name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'

我使用的查询只是为了测试的目的,我要使用的真正查询将实现增量模式,并且将不包含where子句。

我想做的是通过pyspark代码消费数据。下面是我如何操作的代码

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()


data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

我还使用kafka-avro-console-consumer使用以下命令来使用数据

./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic

这两个都给了我奇怪的结果

共有1个答案

呼延原
2023-03-14

如果您正在使用来自Spark的Avro,则需要使用正确的反序列化器。

您可以从控制台看到Avro数据中的字节,然后就可以处理小数/数字了,详见下文。

您可以在这里阅读更多关于Avro的Kafka连接和序列化替代方案(包括JSON)的信息。

 类似资料:
  • 我使用的是0.9.0.0版本的Kafka,我想在不使用管理脚本Kafka-console-consumer.sh的情况下计算主题中的消息数。 我已经尝试了答案Java“How to get number of messages in a topic in apache kafka”中的所有命令,但都没有结果。有人能帮我吗?

  • 我们想通过spring-kafka列出所有Kafka主题,以获得类似于kafka命令的结果: 在下面的服务中运行 getTopics() 方法时,我们会得到 配置: 服务: Kafka已经启动并运行,我们可以成功地从应用程序向主题发送消息。

  • 本文向大家介绍kafka的消息存储?相关面试题,主要包含被问及kafka的消息存储?时的应答技巧和注意事项,需要的朋友参考一下 kafka的消息存储在磁盘上,一个kafka topic分为一个或多个partition,每个partition单独存储自己的消息数据 partition将数据记录到.log文件中,为了避免文件过大影响查询效率,将文件分段处理 记录消息到.log文件中的同时,会记录消息o

  • 这是我的kafka连接器属性 这是我用来创建Elasticsearch水槽的POST主体 我遇到的问题是,有时这个接收器会工作并将数据发送到Elasticsearch并显示 〔2020-09-15 20:27:05904〕INFO WorkerLinkTask{id=test-distributed-connector-0}使用序列号1异步提交偏移。。。。。。。 但大多数时候,它只会卡住并重复这一

  • 我正在使用实现带有自定义确认机制的Kafka消费者。 但我面临一个问题:即使确认没有发送给Kafka,消费者仍然知道下一条消息的偏移量,并且继续阅读新消息,尽管偏移量主题中的偏移量值保持不变。 如何使用户在某些故障的情况下重读消息?

  • 我有个问题,希望你能帮助我。我的控制器中有一个创建CodeIgniter会话的方法。为确保创建正确,我在创建会话后执行函数。然后,在我的Ajax请求中,我放置了一个以刷新页面。 这就是我的方法: 这种方法称为槽式Ajax,如; 最后,在Ajax请求中,我做了,但是当页面刷新时,这将把我重定向到,并且当我想检查包含