我们计划使用Flink处理来自kafka主题的数据流(Json格式的日志)。
但是对于这种处理,我们需要使用每天都在变化的输入文件,其中的信息可以完全改变(不是格式,而是内容)。
每当这些输入文件中的一个发生变化时,我们必须将这些文件重新加载到程序中,并保持流处理继续进行。
重新加载数据的方式与现在相同:
DataSet<String> globalData = env.readTextFile("file:///path/to/file");
但是到目前为止,我还没有找到例子,也没有想出一种方法来触发流处理作业中的重新加载。
作为额外的信息,我们不会使用HDFS,而是在每个节点上使用本地文件系统,因此必须在每个节点上从本地文件重新加载。这是因为我们需要HDFS的唯一原因是这些输入文件总共只有100 mb,使用HDFS是一种过度使用。
编辑:
在阅读了更多内容后,我在几个地方发现这是一种方法:DataArtisans示例。
我试图创建一个简单的代码,从一个控制流中对一个流进行简单的更改,我得到了以下代码:
public class RichCoFlatMapExample extends EventTimeJoinHelper {
private String config_source_path = "NOT_INITIALIZED";
@Override
public void open(Configuration conf) {
config_source_path = "first_file_path";
}
public abstract void processElement1(String one, String two, Collector<String> out) {
config_source_path = one;
}
public abstract void processElement2(String one, String two, Collector<String> out) {
String three = two + config_source_path;
out.collect(three);
}
}
Flink可以监视一个目录,并在文件被移动到该目录时摄取它们;也许这就是你要找的。请参见文档中readfile的process_continureal选项。
但是,如果数据是在Kafka中,那么使用Flink的Kafka消费者将数据直接流到Flink中会更加自然。还有关于使用Kafka连接器的文档。Flink训练包括使用Kafka和Flink的练习。
我的问题是,如果我们有两个原始事件流,即烟雾和温度,并且我们想通过将运算符应用于原始流来找出复杂事件(即火灾)是否发生,我们可以在Flink中做到这一点吗? 我问这个问题是因为到目前为止,我所看到的Flink CEP的所有示例都只包括一个输入流。如果我错了,请纠正我。
null 其中lambda1、2等是条件检查函数,例如 但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?
一段时间以来,我一直在寻找解决spring batch问题的方法。我应该使用spring batch从csv文件复制/创建新的csv文件。这里有一个例子: 下面是一个创建输出编号1的示例,例如File1: 其他输出文件也是一样的,但是你可以看到一些输出使用相同的源,事实上,我不能读取相同的数据两次来重新生成一个新的输出,所以我尝试将它们作为资源存储在地图上(资源将被使用不止一个),也就是说,我将
我将一些事件转发给Kafka并启动了我的Kafka流程序。我的程序开始处理事件并完成。一段时间后,我停止了我的Kafka流应用程序并重新开始。观察到我的Kafka流程序正在处理已经处理过的先前事件。 根据我的理解,Kafka流在内部维护每个应用程序id的输入主题本身的偏移量。但在这里重新处理已经处理的事件。 如何验证Kafka流处理的偏移量?Kafka流是如何保存这些书签的?根据什么 如果Kafk
我目前正在尝试开发一个不和谐的机器人使用Java在Replit,我想保持我的机器人的令牌在.env文件,所以只有我可以访问它。但是,我不知道如何访问Main.java文件中的变量。有办法吗?
问题内容: 在i386 linux上。如果可能,最好在c /(c / posix std libs)/ proc中。如果没有,那么任何程序集或第三方库都可以做到这一点? 编辑:我正在尝试开发测试内核模块是否清除缓存行或整个处理器(与wbinvd())。程序以root身份运行,但我希望尽可能保留在用户空间中。 问题答案: 高速缓存一致性系统会尽最大努力向您隐藏此类信息。我认为您将不得不通过使用性能计