在尝试使用Pulsar-Flink-Connector的相关API时,出现了问题。相关代码很简单:
package org.happy.test;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonSer;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;
import java.util.Optional;
import java.util.Properties;
public class PulsarPerson {
public static void main(String[] args) {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "persistent://public/default/source-person");
props.setProperty("partition.discovery.interval-millis", "5000");
FlinkPulsarSource<Person> source = new FlinkPulsarSource<>(
Constants.serviceUrl,
Constants.adminUrl,
PulsarDeserializationSchema.valueOnly(JsonDeser.of(Person.class)),
props
);
source.setStartFromLatest();
DataStream<Person> sourceStream = see.addSource(source);
sourceStream.print();
PulsarSerializationSchema<Person> schema = new
PulsarSerializationSchemaWrapper.Builder<>(JsonSer.of(Person.class))
.usePojoMode(Person.class, RecordSchemaType.JSON)
.setTopicExtractor(str -> null)
.build();
FlinkPulsarSink<Person> sink = new FlinkPulsarSink(
Constants.serviceUrl,
Constants.adminUrl,
Optional.of("persistent://public/default/sink-person"),
props,
schema
);
sourceStream.addSink(sink);
try {
see.execute("test-pulsar-source-sink-schema");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Person的代码是:
public class Person {
public String name;
public String country;
Person(String name, String country) {
this.name = name;
this.country = country;
}
@Override
public String toString() {
return "[" + name + "," + country + "]";
}
}
Flink端Stdout的输出是
Exception in thread "AsyncHttpClient-5-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:293)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:460)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:430)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:422)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:279)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:270)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:241)
at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:450)
at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:426)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:271)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 13 more
Exception in thread "AsyncHttpClient-5-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:293)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:460)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:430)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:422)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:279)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:270)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:241)
at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:450)
at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:426)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:271)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
看这个问题啥也没看出来,于是便看了一下flink的Log,里边提到了如下内容:
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: org.apache.pulsar.shade.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.happy.test.Person` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
大概意思就是没提供默认构造函数,于是给Person加上默认构造函数之后,代码运行成功。
public class Person {
public String name;
public String country;
// you must provide the default constructor
Person() {}
Person(String name, String country) {
this.name = name;
this.country = country;
}
@Override
public String toString() {
return "[" + name + "," + country + "]";
}
}
对象Person在网络传输涉及两个过程:
1.Json序列化;
2. Json反序列化;(错误应该是在该步骤发生的)
问题
:明明提供了构造函数,但是为啥还需提供默认构造函数。
猜想
: 可能是因为:在反序列化的过程中,需要调用默认构造函数创建对象,之后调用反射初始化其内部字段。