当前位置: 首页 > 工具软件 > datastream.io > 使用案例 >

PulsaFlinkConnector java.lang.NoClassDefFoundEror org/apache/pulsar/shade/io/netty/buffer/PoolArena

韦高格
2023-12-01

问题描述

在尝试使用Pulsar-Flink-Connector的相关API时,出现了问题。相关代码很简单:

相关代码

package org.happy.test;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonSer;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;

import java.util.Optional;
import java.util.Properties;

public class PulsarPerson {

    public static void main(String[] args) {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("topic", "persistent://public/default/source-person");
        props.setProperty("partition.discovery.interval-millis", "5000");


        FlinkPulsarSource<Person> source = new FlinkPulsarSource<>(
                Constants.serviceUrl,
                Constants.adminUrl,
                PulsarDeserializationSchema.valueOnly(JsonDeser.of(Person.class)),
                props
        );



        source.setStartFromLatest();
        DataStream<Person> sourceStream = see.addSource(source);

        sourceStream.print();


        PulsarSerializationSchema<Person> schema = new
                PulsarSerializationSchemaWrapper.Builder<>(JsonSer.of(Person.class))
                .usePojoMode(Person.class, RecordSchemaType.JSON)
                .setTopicExtractor(str -> null)
                .build();


        FlinkPulsarSink<Person> sink = new FlinkPulsarSink(
                Constants.serviceUrl,
                Constants.adminUrl,
                Optional.of("persistent://public/default/sink-person"),
                props,
                schema
        );

        sourceStream.addSink(sink);

        try {
            see.execute("test-pulsar-source-sink-schema");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Person的代码是:

public class Person {

    public String name;
    public String country;

    Person(String name, String country) {
        this.name = name;
        this.country = country;
    }

    @Override
    public String toString() {
        return "[" + name + "," + country + "]";
    }

}

错误输出

Flink端Stdout的输出是

Exception in thread "AsyncHttpClient-5-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
	at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:293)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:460)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:430)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:422)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:279)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:270)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:241)
	at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:450)
	at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:426)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:271)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 13 more
Exception in thread "AsyncHttpClient-5-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
	at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:293)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:460)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:430)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:422)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:279)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:270)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:241)
	at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:450)
	at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:426)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:271)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

看这个问题啥也没看出来,于是便看了一下flink的Log,里边提到了如下内容:

Caused by: org.apache.pulsar.client.api.SchemaSerializationException: org.apache.pulsar.shade.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.happy.test.Person` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

大概意思就是没提供默认构造函数,于是给Person加上默认构造函数之后,代码运行成功。

public class Person {

    public String name;
    public String country;

    // you must provide the default constructor
    Person() {}

    Person(String name, String country) {
        this.name = name;
        this.country = country;
    }

    @Override
    public String toString() {
        return "[" + name + "," + country + "]";
    }

}

猜想

对象Person在网络传输涉及两个过程:
1.Json序列化;
2. Json反序列化;(错误应该是在该步骤发生的)

问题:明明提供了构造函数,但是为啥还需提供默认构造函数。
猜想: 可能是因为:在反序列化的过程中,需要调用默认构造函数创建对象,之后调用反射初始化其内部字段。

 类似资料: