当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect Sink-来自:Avro Topic,到:Json->Redis

姬雪松
2023-03-14

我有一个环境,我使用一个Kafka Connect Worker,它使用Oracle数据库中的一些数据,然后将其推送到Avro格式的Kafka主题中。

现在,我需要创建一个Kafka连接接收器来使用这个AVRO消息,将其转换为Json,然后将其写入Redis数据库。

到目前为止,我只能在Redis上写我在topic中使用的同样的AVRO消息。我曾尝试使用转换器,但可能误解了其用法。

吼我的工人和水槽的配置。

{
    "name": "SOURCE",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "transforms": "createKey, extractStr",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "ID",
        "transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractStr.field": "ID",
        "connection.url": "<>",
        "connection.user": "<>",
        "connection.password": "<>",
        "table.whitelist": "V_TEST_C",
        "schema.pattern": "<>",
        "numeric.mapping": "best_fit",
        "mode": "timestamp+incrementing",
        "incrementing.column.name": "CID",
        "timestamp.column.name": "TS_ULT_ALT",
        "validate.non.null": "false",
        "table.types": "VIEW",
        "retention.ms":12000,
        "poll.interval.ms": "30000",
        "topic.prefix": "TEST.",
        "value.converter.schema.registry.url": "<>"
    }
}

水槽

{
  "name": "SINK",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "V_TEST_C",
    "redis.hosts": "redis:6379",
    "schema.registry.url": "<>",
    "value.converter.schema.registry.url": "<>",
    "value.converter.schemas.enable":"false",
    "key.converter.schemas.enable":"false",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "quote.sql.identifier": "never"
  }
}

共有1个答案

楚俊迈
2023-03-14

在Avro格式的Kafka主题中推出

那不是你贴的

"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

在任何情况下,都不能使用Redis连接器存储除字符串或字节之外的任何内容

来自文档(强调添加

此连接器期望来自Kafka的记录具有存储为字节或字符串的键和值。如果您的数据已经以您希望在Redis中使用的格式在Kafka中,请考虑为此连接器使用ByteArrayConverter或StringConverter

因此,使用AvroConverter是不可能的,因为它生成的是结构化对象,而不是字符串或字节的模式。

注意:禁用模式的JSONConverter实际上与处理JSON数据的字符串转换器的行为相同。然而,如果将StringConverter与Avro数据一起使用,最终可能会得到类似于Struct{foo=bar}的数据,但您仍然需要在某种程度上运行Avro反序列化器,因此我不确定您是否真的可以将Json或StringConverter与Avro主题数据一起使用。

 类似资料:
  • 我正在从Ajax JSON创建一个DataTable。 将创建DataTables,但它显示一个错误: DataTables警告:表ID=CHANGETABLE-为行0,列0请求未知参数“0”。有关此错误的详细信息,请参阅http://datatables.net/TN/4 我的JSON看起来如下所示 在我的Java控制器中创建了这样的JSON对象: 我不知道如何将此信息与 属性一起使用,以使其适

  • 在过去的几天里,我一直在努力解决这个问题,我想我尝试了在互联网上找到的每一个例子。我正在尝试登录我的REST,并获取持续通信的授权码。它在使用邮递员时有效 ------取自邮递员生成代码片段---- 它回来了 我的CodenameOne项目中的代码是: 当使用CodenameOne模拟器网络监视器时,会产生以下结果

  • 我和这里有同样的问题。成功了。但为了完成,我需要一个稍微不同的输出:而不是只有 我想要: 以下是当前代码: 使用的CSV是: 目前的产出是:

  • 问题内容: 我有一种感觉,我在这里做错了什么,但是我不确定是错过了一步,还是遇到编码问题或其他问题。这是我的代码: 当我打印内容时,我会得到一大堆的装饰物和特殊字符,基本上是乱码。我会在这里复制并粘贴它,但这不起作用。我究竟做错了什么? 问题答案: 在这种情况下,这不是字符编码问题,而是内容编码问题。您正在等待文本,但是服务器使用压缩来节省带宽。如果在获取该URL时查看标题,则可以看到您连接的服务

  • 问题内容: 我必须向Google地图添加多个标记,但数据位于extern json文件中。 目前我正在这样运行 现在,我试图将Json文件排除到另一个文件中,但是Sadyl我无法使它工作;( 码 foo.txt 谢谢你的帮助 问题答案: 您的代码中有两个问题。您的json文件在开头和结尾都缺少。您的JavaScript也是错误的,您想对的回调中的json进行操作。您的问题的代码是: 编辑: 这是一

  • 我有Java POJO课是这样的: 我有一个像这样的Kotlin数据课 如何向的任何变量提供,如Java变量中的注释?