我有一个环境,我使用一个Kafka Connect Worker,它使用Oracle数据库中的一些数据,然后将其推送到Avro格式的Kafka主题中。
现在,我需要创建一个Kafka连接接收器来使用这个AVRO消息,将其转换为Json,然后将其写入Redis数据库。
到目前为止,我只能在Redis上写我在topic中使用的同样的AVRO消息。我曾尝试使用转换器,但可能误解了其用法。
吼我的工人和水槽的配置。
{
"name": "SOURCE",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "createKey, extractStr",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractStr.field": "ID",
"connection.url": "<>",
"connection.user": "<>",
"connection.password": "<>",
"table.whitelist": "V_TEST_C",
"schema.pattern": "<>",
"numeric.mapping": "best_fit",
"mode": "timestamp+incrementing",
"incrementing.column.name": "CID",
"timestamp.column.name": "TS_ULT_ALT",
"validate.non.null": "false",
"table.types": "VIEW",
"retention.ms":12000,
"poll.interval.ms": "30000",
"topic.prefix": "TEST.",
"value.converter.schema.registry.url": "<>"
}
}
水槽
{
"name": "SINK",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "V_TEST_C",
"redis.hosts": "redis:6379",
"schema.registry.url": "<>",
"value.converter.schema.registry.url": "<>",
"value.converter.schemas.enable":"false",
"key.converter.schemas.enable":"false",
"insert.mode": "UPSERT",
"delete.enabled": "false",
"quote.sql.identifier": "never"
}
}
在Avro格式的Kafka主题中推出
那不是你贴的
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
在任何情况下,都不能使用Redis连接器存储除字符串或字节之外的任何内容
来自文档(强调添加)
此连接器期望来自Kafka的记录具有存储为字节或字符串的键和值。如果您的数据已经以您希望在Redis中使用的格式在Kafka中,请考虑为此连接器使用ByteArrayConverter或StringConverter
因此,使用AvroConverter是不可能的,因为它生成的是结构化对象,而不是字符串或字节的模式。
注意:禁用模式的JSONConverter实际上与处理JSON数据的字符串转换器的行为相同。然而,如果将StringConverter与Avro数据一起使用,最终可能会得到类似于Struct{foo=bar}
的数据,但您仍然需要在某种程度上运行Avro反序列化器,因此我不确定您是否真的可以将Json或StringConverter与Avro主题数据一起使用。
我正在从Ajax JSON创建一个DataTable。 将创建DataTables,但它显示一个错误: DataTables警告:表ID=CHANGETABLE-为行0,列0请求未知参数“0”。有关此错误的详细信息,请参阅http://datatables.net/TN/4 我的JSON看起来如下所示 在我的Java控制器中创建了这样的JSON对象: 我不知道如何将此信息与 属性一起使用,以使其适
在过去的几天里,我一直在努力解决这个问题,我想我尝试了在互联网上找到的每一个例子。我正在尝试登录我的REST,并获取持续通信的授权码。它在使用邮递员时有效 ------取自邮递员生成代码片段---- 它回来了 我的CodenameOne项目中的代码是: 当使用CodenameOne模拟器网络监视器时,会产生以下结果
我和这里有同样的问题。成功了。但为了完成,我需要一个稍微不同的输出:而不是只有 我想要: 以下是当前代码: 使用的CSV是: 目前的产出是:
问题内容: 我有一种感觉,我在这里做错了什么,但是我不确定是错过了一步,还是遇到编码问题或其他问题。这是我的代码: 当我打印内容时,我会得到一大堆的装饰物和特殊字符,基本上是乱码。我会在这里复制并粘贴它,但这不起作用。我究竟做错了什么? 问题答案: 在这种情况下,这不是字符编码问题,而是内容编码问题。您正在等待文本,但是服务器使用压缩来节省带宽。如果在获取该URL时查看标题,则可以看到您连接的服务
问题内容: 我必须向Google地图添加多个标记,但数据位于extern json文件中。 目前我正在这样运行 现在,我试图将Json文件排除到另一个文件中,但是Sadyl我无法使它工作;( 码 foo.txt 谢谢你的帮助 问题答案: 您的代码中有两个问题。您的json文件在开头和结尾都缺少。您的JavaScript也是错误的,您想对的回调中的json进行操作。您的问题的代码是: 编辑: 这是一
我有Java POJO课是这样的: 我有一个像这样的Kotlin数据课 如何向的任何变量提供,如Java变量中的注释?