我使用confluent的kafka connect将数据传输到s3桶中。基于键进行理想的分区。因为现有的FieldPartitioner只适用于Avro模式记录,而不适用于一般的字符串化JSON文本。我想我应该写我自己的连接器。
课堂是这样的:
package com.package.kafka.connect;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.partitioner.FieldPartitioner;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class JsonFieldPartitioner<T> extends DefaultPartitioner<T> {
private static final Logger log = LoggerFactory.getLogger(FieldPartitioner.class);
private List<String> fieldNames;
private List<String> keys;
public void configure(Map<String, Object> config){
fieldNames = (List<String>) config.get("partition.field.name");
String field = fieldNames.get(0);
keys = new ArrayList<String>(Arrays.asList(field.split(".")));
}
public String encodePartition(SinkRecord sinkRecord){
String value = sinkRecord.value().toString();
JsonElement rootElement = new JsonParser().parse(value);
JsonElement element = rootElement;
for(String key : keys){
log.info("key: "+ key);
try{
element = element.getAsJsonObject().get(key);
}catch(Exception e){
log.error("encountered error getting key: " + key);
throw new ConfigException("Key element not found" + key);
}
}
String fieldValue = "";
try{
fieldValue = element.getAsString();
}catch(Exception e){
log.error("encountered error getting key value ");
throw new ConfigException("Key element not found");
}
return fieldValue;
}
public List<T> partitionFields() {
if (partitionFields == null) {
partitionFields = newSchemaGenerator(config).newPartitionFields(
Utils.join(fieldNames, ",")
);
}
return partitionFields;
}
}
当我构建它并尝试运行kafka connect时,我得到了一个错误
java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:269)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:32)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(
从查看打包一个自定义Java'partitioner.class'插件为Kafka连接在汇流4.1 Kafka 1.1?我试图把由这个构建的jar文件放在kafka-连接-存储-共同
目录中$CONFLUENT_HOME但我仍然得到同样的错误。
构建jar的gradle文件在这里
id 'java'
}
group 'JsonFieldPartitioner'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.kafka', name: 'connect-api', version: '2.3.0'
compile fileTree( dir:'/Users/myuser/confluent-5.3.0/share/java/kafka-connect-storage-common', include: ['*.jar'])
compile group: 'joda-time', name: 'joda-time', version: '2.10.3'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
在 s3 连接器属性文件中,我只是通过 com.package.kafka.connect.JsonFieldPartitioner
引用该类
如果有人成功地构建了自定义分区器,任何帮助都将不胜感激。
在准备kafka连接图像时:-我们可以做
confluent-hub install --no-prompt /tmp/connect-fieldandtime-partitioner-1.2.0.zip
这里的zip是自定义的分区器,那么为什么我们需要将JAR放在服务的lib文件夹中呢
将自定义jar文件复制到插件文件夹Example/usr/local/share/Kafka/plugins/confluentinc-Kafka-connect-S3/下
我试图使用spring cloud stream绑定实现一个自定义的Kafka分区器。我只想对用户主题进行自定义分区,而不对公司主题进行任何操作(在本例中,Kafka将使用DefaultPartitioner)。 我的绑定配置: 我使用以下方式将消息发送到流中: 我的UserPartitioner类: 我最终收到以下异常: 编辑:根据文档,还尝试了以下步骤: User-Out:Destinatio
我正在尝试使用docker容器中的kafka connect和一个自定义连接器(PROGRESS _ DATADIRECT _ JDBC _ OE _ all . jar)来连接openedge数据库。 我将JAR文件放在插件路径(usr/share/java)中,但它不会作为连接器加载。 我可以通过将另一个(标准)连接器放在插件路径中来加载它。这行得通 有点不知道如何前进,我对Kafka很陌生。
目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子: 我创建了一个接口IWebsocketClientEndpoint 以及实现上述接口的类: WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。 目标是:如何在Kafka连接结构中调整我的websocket结构?我
我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“
我正在实现一个自定义消费者的主题/分区分配在Kafka。为此,我将重写抽象类,该类又实现接口。 作为自定义赋值器的一部分,我希望发送一个关于消费者订阅的每个主题的每个分区的单个(浮动)信息。 我知道可以通过重写接口的默认方法向赋值器发送自定义数据。 但是,问题是,从上面的方法签名中,我无法获得为使用者注册的每个主题分配给带下划线使用者的分区列表。 谢谢你。
本文向大家介绍Python自定义线程池实现方法分析,包括了Python自定义线程池实现方法分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python自定义线程池实现方法。分享给大家供大家参考,具体如下: 关于python的多线程,由与GIL的存在被广大群主所诟病,说python的多线程不是真正的多线程。但多线程处理IO密集的任务效率还是可以杠杠的。 我实现的这个线程池其实是根据银角的