使用fromElements函数创建数据流时出错
下面是探险-
原因:java.io.IOException:无法从源反序列化元素。如果您使用的是用户定义的序列化(值和可写类型),请检查序列化函数。序列化程序是org.apache.flink.api.java.typeutils.runtime.kryo.kryoSerializer@599fcdda在org.apache.flink.streaming.api.functions.source.fromelementsfunction.run(fromelementsfunction.java:121)在org.apache.flink.streaming.api.operators.streamsource.run(streamsource.java:58)在
为什么要处理InputStreamReader
元组?我猜这里有些理解不到位。泛型类型指定要处理的数据的类型。例如:
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
生成包含5个integer
元组的有限数据流。我假设您实际上希望使用InputStreamReader
来生成实际的元组。
如果要通过HTTPURLConnection
进行读取,可以实现自己的SourceFunction
(或RichSourceFunction
),如下所示(将Out
替换为要使用的实际数据类型--还可以考虑FlinksTuple0
到Tuple25
类型):
env.addSource(new SourceFunction<OUT> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<OUT> ctx) {
InputStreamReader isr = null;
try {
URL url = new URL("ex.in/res");
HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
if (httpconn.getResponseCode() != 200)
throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
isr = new InputStreamReader((httpconn.getInputStream()));
} catch (Exception e) {
// clean up; log error
return;
}
while(isRunning) {
OUT tuple = ... // get data from isr
ctx.collect(tuple);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
});
我想创建一个简单的容器,其中包含一个带有初始化数据库的MySQL服务器。我的Dockerfile目前如下所示: 但是,当我通过我得到以下错误: 当我注释要创建数据库的行时(
请帮我解决这个问题。抱歉,我无法格式化StackTrace。
我正在尝试创建一个新的JavaDB。我已经将Java DB驱动程序添加到库中,但在服务下创建新数据库时,它仍然会抛出一个错误。 我下载并定义了db-derby-10.15.2.0-bin 我在这里定义了drover文件 我在这里单击了创建数据库 填了这张表 点击om后显示如下 谁来帮帮我
创建数据流主要包括如下两个部分: 1. 获取相关信息 主要为获取FDS Bucket相关的信息 2. 创建/迁移Topic 数据最终需要收集到Talos 的Topic中,因此需要首先创建Topic,并迁移到生态云账号体系下;关于Talos相关,可以参见Talos-流式消息队列 3. 配置数据流 配置数据流需要的信息
我有许多未分区的大型BigQuery表和文件,我希望以各种方式对它们进行分区。因此,我决定尝试编写一个数据流作业来实现这一点。我认为这工作很简单。我尝试使用泛型编写,以便轻松地应用TextIO和BigQueryIO源代码。它在小型表上工作得很好,但在大型表上运行时,我总是得到。 在我的主类中,我要么读取一个带有目标键的文件(由另一个DF作业生成),要么对一个BigQuery表运行一个查询,以获得要
我有2个使用kafka主题创建的流,我正在使用DataStream API加入它们。我希望将连接(应用)的结果发布到另一个kafka主题。我在外部主题中看不到连接的结果。 我确认我向两个源主题发布了正确的数据。不确定哪里出了问题。下面是代码片段, 创建的流如下所示。 流连接使用等于的连接执行,如下所示。 如下所述, 有什么线索吗,哪里出了问题?我可以在拓扑中看到可用的消息,谢谢