当前位置: 首页 > 知识库问答 >
问题:

在Apache Flink中创建数据流时出错

赵才俊
2023-03-14

使用fromElements函数创建数据流时出错

下面是探险-

原因:java.io.IOException:无法从源反序列化元素。如果您使用的是用户定义的序列化(值和可写类型),请检查序列化函数。序列化程序是org.apache.flink.api.java.typeutils.runtime.kryo.kryoSerializer@599fcdda在org.apache.flink.streaming.api.functions.source.fromelementsfunction.run(fromelementsfunction.java:121)在org.apache.flink.streaming.api.operators.streamsource.run(streamsource.java:58)在

共有1个答案

乐正心水
2023-03-14

为什么要处理InputStreamReader元组?我猜这里有些理解不到位。泛型类型指定要处理的数据的类型。例如:

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);

生成包含5个integer元组的有限数据流。我假设您实际上希望使用InputStreamReader来生成实际的元组。

如果要通过HTTPURLConnection进行读取,可以实现自己的SourceFunction(或RichSourceFunction),如下所示(将Out替换为要使用的实际数据类型--还可以考虑FlinksTuple0Tuple25类型):

env.addSource(new SourceFunction<OUT> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<OUT> ctx) {
        InputStreamReader isr = null;
        try {
            URL url = new URL("ex.in/res");
            HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
            if (httpconn.getResponseCode() != 200)
                throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
            isr = new InputStreamReader((httpconn.getInputStream()));
        } catch (Exception e) {
            // clean up; log error
            return;
        }

        while(isRunning) {
            OUT tuple = ... // get data from isr
            ctx.collect(tuple);
        }
    }

    @Override
    public void cancel() {
         this.isRunning = false;
    }
});
 类似资料:
  • 我想创建一个简单的容器,其中包含一个带有初始化数据库的MySQL服务器。我的Dockerfile目前如下所示: 但是,当我通过我得到以下错误: 当我注释要创建数据库的行时(

  • 请帮我解决这个问题。抱歉,我无法格式化StackTrace。

  • 我正在尝试创建一个新的JavaDB。我已经将Java DB驱动程序添加到库中,但在服务下创建新数据库时,它仍然会抛出一个错误。 我下载并定义了db-derby-10.15.2.0-bin 我在这里定义了drover文件 我在这里单击了创建数据库 填了这张表 点击om后显示如下 谁来帮帮我

  • 创建数据流主要包括如下两个部分: 1. 获取相关信息 主要为获取FDS Bucket相关的信息 2. 创建/迁移Topic 数据最终需要收集到Talos 的Topic中,因此需要首先创建Topic,并迁移到生态云账号体系下;关于Talos相关,可以参见Talos-流式消息队列 3. 配置数据流 配置数据流需要的信息

  • 我有许多未分区的大型BigQuery表和文件,我希望以各种方式对它们进行分区。因此,我决定尝试编写一个数据流作业来实现这一点。我认为这工作很简单。我尝试使用泛型编写,以便轻松地应用TextIO和BigQueryIO源代码。它在小型表上工作得很好,但在大型表上运行时,我总是得到。 在我的主类中,我要么读取一个带有目标键的文件(由另一个DF作业生成),要么对一个BigQuery表运行一个查询,以获得要

  • 我有2个使用kafka主题创建的流,我正在使用DataStream API加入它们。我希望将连接(应用)的结果发布到另一个kafka主题。我在外部主题中看不到连接的结果。 我确认我向两个源主题发布了正确的数据。不确定哪里出了问题。下面是代码片段, 创建的流如下所示。 流连接使用等于的连接执行,如下所示。 如下所述, 有什么线索吗,哪里出了问题?我可以在拓扑中看到可用的消息,谢谢