我试图使用flink从kafka中读取数据,执行一些函数,并将结果返回到不同的kafka主题,但出现以下错误`组织。阿帕奇。Flink。应用程序编程接口。常见的InvalidProgrameException:MapFunction的实现不可序列化。对象可能包含或引用不可序列化的字段。
我收到了来自kafka的消息-对其进行了一些操作,并返回了一个对象列表,我想发送到不同的主题。
class Wrapper implements Serializable{
@JsonProperty("viewBuilderRequests")
private ArrayList<ViewBuilderRequest> viewBuilderRequests;
public Wrapper(){}
public Wrapper(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
this.viewBuilderRequests = viewBuilderRequests;
}
public List<ViewBuilderRequest> getViewBuilderRequests() {
return viewBuilderRequests;
}
public void setViewBuilderRequests(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
this.viewBuilderRequests = viewBuilderRequests;
}
}
public class ViewBuilderRequest implements Serializable {
private CdmId cdmId
private ViewBuilderOperation operation
private List<ViewUserSystemIdentifier> viewUserSystemIdentifiers
public ViewBuilderRequest(){
}
public CdmId getCdmId() {
return cdmId;
}
public void setCdmId(CdmId cdmId) {
this.cdmId = cdmId;
}
public ViewBuilderOperation getOperation() {
return operation;
}
public void setOperation(ViewBuilderOperation operation) {
this.operation = operation;
}
public List<ViewUserSystemIdentifier> getViewUserSystemIdentifiers() {
return viewUserSystemIdentifiers;
}
public void setViewUserSystemIdentifiers(List<ViewUserSystemIdentifier> viewUserSystemIdentifiers) {
this.viewUserSystemIdentifiers = viewUserSystemIdentifiers;
}
public enum ViewBuilderOperation implements Serializable{
Create, Update,Delete
}
private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
return wrapper;
};
内部类也实现了可序列化
此代码引发异常:
dataStream。映射(parseAndSendToGraphProcessing)。addSink(新Flinkkapkaproducer
我还对这两个对象进行了反序列化。
public class WrapperSchema implements DeserializationSchema<Wrapper>, SerializationSchema<Wrapper> {
// private final static ObjectMapper objectMapper = new ObjectMapper().configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
static ObjectMapper objectMapper = new ObjectMapper();
@Override
public Wrapper deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Wrapper.class);
}
@Override
public boolean isEndOfStream(Wrapper nextElement) {
return false;
}
@Override
public byte[] serialize(Wrapper element) {
// return element.toString().getBytes();
if(objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper();
}
try {
String json = objectMapper.writeValueAsString(element);
return json.getBytes();
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public TypeInformation<Wrapper> getProducedType() {
return TypeInformation.of(Wrapper.class);
}
}
要使flink工作,您的消息和地图函数都必须是可序列化的。
据我所知,你的消息看起来是可序列化的。
但是你的map函数不是。有时候很难让lambda序列化。我认为在您的案例中,问题在于parseAndSendToGraphProcessing
使用的是objectMapper
和janusGraphDataProcessing
,它们必须是可序列化的。
我的猜测是janusGraphDataProcessing
是不可序列化的(如果您使用的是jackson 2.1或更高版本,则OjbectMapper是)。
如果是这种情况,那么一种解决方法是编写一个自定义RichMapFunction类,该类将把janusGraphDataProcessing
存储为瞬态变量,并在其open
函数中对其进行初始化。
private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
return wrapper;
};
我尝试为我的定制类实现一个方法,使用Flink-Kafka连接器生成关于Kafka的数据。类原型如下所示: 将数据写入特定Kafka主题的方法如下: 我有另一种方法可以从Kafka主题获取对象的字段中的数据,效果很好。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题时,我遇到了错误: 主要代码: Java似乎试图序列化对象,而不仅仅是字段
我正在使用Apache Flink对流数据执行分析。 我正在使用一个依赖项,其对象需要超过10秒才能创建,因为它在初始化之前读取hdfs中存在的几个文件。 如果我在open方法中初始化对象,我会得到一个超时异常,如果在接收器/平面图的构造函数中,我会得到序列化异常。 目前,我正在使用静态块来初始化其他类中的对象,使用前提条件。在主文件中选中NOTNULL(mgGenerator.mgGenerat
我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:
我们正在考虑在我们的for消息传递中使用Kafka,我们的应用程序是使用Spring开发的。所以,我们已经计划用Spring-Kafka。 生产者将消息作为HashMap对象放入队列。我们有JSON序列化器,并且假设映射将被序列化并放入队列。这是生产者配置。 我们看到的文章很少,建议是这样做: 我们不想为创建反序列化程序编写一些代码。有没有我们缺少的样板?任何帮助都将不胜感激!!
我的Flink管道目前使用一个Pojo,它包含一些列表和(字符串的)映射,如下所示
我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一