我尝试为我的定制类实现一个方法,使用Flink-Kafka连接器生成关于Kafka的数据。类原型如下所示:
public class StreamData implements Serializable {
private transient StreamExecutionEnvironment env;
private DataStream<byte[]> data ;
...
将数据写入特定Kafka主题的方法如下:
public void writeDataIntoESB(String id) throws Exception {
FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
"localhost:9092",
id,
new KeyedSerializationSchema<byte[]>() {
@Override
public byte[] serializeKey(byte[] bytes) {
return bytes;
}
@Override
public byte[] serializeValue(byte[] bytes) {
return bytes;
}
@Override
public String getTargetTopic(byte[] bytes) {
return null;
}
});
data.addSink(producer);
}
我有另一种方法可以从Kafka主题获取对象的data
字段中的数据,效果很好。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题时,我遇到了错误:
org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable
主要代码:
StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");
Java似乎试图序列化对象,而不仅仅是字段数据
!使用Flink Kafka连接器向Kafka生成数据的代码已经过测试,并且可以正常使用(我的意思是不使用类,而是在main中编写所有代码)
如何消除错误?
使数据
属性静态化,解决了问题。有人能详细说明这一点吗?这是一个好的解决方案吗?
private static DataStream<byte[]> data ;
您也可以像这样在Flink中进行序列化
dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new
CustomSerializerSchema(),properties));
public class CustomSerializerSchema implements SerializationSchema<MyUser> {
private static final long serialVersionUID = 1L;
@Override
public byte[] serialize(MyUser element) {
byte[] b = null;
try {
b= new ObjectMapper().writeValueAsBytes(element);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return b;
}
}
我认为问题的原因是您的代码正在执行以下操作:
new KeyedSerializationSchema<byte[]>() {...}
这段代码所做的是创建一个匿名的KeyedSeriazationSchema子类,作为定义类(StreamData)的内部类。每个内部类都持有对外部类实例的隐式引用,因此使用默认Java序列化规则序列化它也会传递地尝试序列化外部对象(StreamData)。解决此问题的最好方法是将您的KeyedSeriazationSchema子类声明为:
我认为最后一种方法是这样的:
public class StreamData {
static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
...
};
...
public void writeDataIntoESB(String id) throws Exception {
FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);
data.addSink(producer);
}
}
我有一个看起来很常见的问题,但到目前为止,我还没有找到一个适合我的解决方案。我想我只是错过了一些小事情,但我已经崩溃了,请求帮助。我正在尝试使用flask和pymongo获得json输出。 以下是控制台中使用print(结果)的对象: 当我试图返回时,我得到了错误: TypeError: ObjectId类型的对象不是JSON可序列化的 类联系人(资源): 我试过bson。json_util建议。
我目前正在尝试扩展一个使用Scala和Spark的机器学习应用程序。我正在使用我在Github上找到的Dieterich Lawson之前项目的结构 https://github.com/dieterichlawson/admm 该项目基本上使用SparkContext来构建训练样本块的RDD,然后对每个样本集执行局部计算(例如求解线性系统)。 我遵循同样的方案,但为了进行局部计算,我需要对每个训
我正在尝试将文件从mongoDB读取到本地。 我的代码如下:STRING=“myLocalPath”PATH=STRING.json 我得到错误-对象类型'ObjectId'是不是JSON序列化...请建议。
我有一个api,我正在尝试使用Flask Pymongo存储/发布一个用户对象。 但是,我得到了以下错误 文件“/home/kay/.local/share/virtualenvs/server-iT4jZt3h/lib/python3.7/site-packages/flask/json/i│ 尼特。py”,第321行,在jsonify中 │ 转储(数据,缩进=缩进,分隔符=分隔符)“\n”,│
我有以下用于序列化查询集的代码: 下面是我的 我需要将其序列化。但它说无法序列化
我正在试验Stanford CoreNLP库,我想序列化主要的StanfordCoreNLP管道对象,尽管它抛出了一个java.io.NotSerializableException。 完整故事:每当我运行我的实现时,将管道注释器和分类器加载到内存中大约需要15秒。最终进程的内存约为600MB(很容易小到可以存储在我的机箱中)。我想在第一次创建管道后保存它,这样我就可以在以后将其读入内存。 然而,