我有一个流应用程序,它从Kafka主题读取数据,从文件读取数据,聚合数据并创建结果。
每5分钟,我想得到多少记录被消耗和记录从文件中读取的计数,并将其发送到另一个流。
我该怎么做?
您可以使用侧面输出
您还可以生成任意数量的附加侧输出结果流。结果流中的数据类型不必与主流中的数据类型匹配,不同侧输出的类型也可能不同。当您想要拆分数据流时,此操作可能很有用,因为您通常必须复制数据流,然后从每个流中过滤掉您不想拥有的数据。
因为侧输出需要扩展ProcessFunction
或KeyedProcessFunction
,所以您可以利用它来使用onTimer()。这是一个例子。
ctx - 一个 ProcessFunction.OnTimerContext,它允许查询触发计时器的时间戳,查询触发计时器的 TimeDomain,并获取用于注册计时器和查询时间的计时器服务。上下文仅在调用此方法期间有效,不要存储它。
问题的关键是在processElement()
方法中使用out.collect(…)
,以获取所消耗的每个数据。并使用onTimer()
方法中的侧输出,每5分钟发射一次第二个流。
public class SideOutputWithTimer extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
final OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side-output") {
};
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
Tuple2<String, String> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
// emit data without transforming it
out.collect(Tuple2.of(value.f0, 1L));
}
@Override
public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
// USE 5 MINITES INSTEAD OF 60000 milliseconds
if (timestamp == result.lastModified + 60000) {
// emit data to side output on timeout and after aggregating it
ctx.output(outputTag, Tuple2.of(result.key, result.count));
}
}
}
class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
我在一个类中有一个imageView,在单击imageView时,会出现一个对话框,它有两个选项,可以从相机中获取图像,也可以打开设备的图像库。我想将图像从一个类发送到另一个类,这样它就可以出现在ImageView中。我搜索了很多小时,但我只得到关于从一个类到另一个类的文本数据发送。谁能告诉我从一个类到另一个类的图像发送? 这是来自sender类的代码,它将获取图像。 谢谢你的帮助
问题内容: 问:我怎样才能从读到的一切入的方式是不是一个手工制作的循环用我自己的字节的缓冲区? 问题答案: 编写一个方法来执行此操作,然后从需要该功能的任何地方调用它。番石榴已经在中提供了代码。我敢肯定,几乎所有其他具有“通用” IO功能的库也都有它,但是Guava是我第一个“入门”库。它震撼了:)
我有一个事件数据流和另一个模式数据流。模式是由用户在运行时提供的,它们需要通过一个Kafka主题来提供。我需要使用flink-cep在事件流上应用每个模式。在我事先不知道模式的情况下,有没有办法从数据流中获取PatternStream?
问题内容: 我有一个JSP文件为 jsp 1.jsp ,另一个JSP文件为 jsp 2.jsp 我已经包括 JSP 2.jsp 在 JSP 1.jsp页面 使用 现在,我需要某些元素上的click事件。在那件事上,我想将一个字符串变量传递给包含的jsp。 假设我有一个列表,单击它后,我想将该列表的名称转移到另一个JSP, 在另一个JSP中,我试图使用该字符串执行某些任务。 我在没有任何servle
我有一个自定义的,我希望将作为从一传递到二(这是模态)。问题是,当我使用在之间切换时,函数不会被触发,因为处理我假设的转换。 那我该怎么做呢?如何将从一个传递到下一个。
我有一个Web服务,它使用Spring Rest Controller(使用Netty而不是Apache Tomcat)处理GET/POST/PUT HTTP请求。我希望过滤我服务中的所有请求,当请求配置了特定的标头时,我希望将此特定请求发送到一个完全不同的URL,同时将响应返回到发送原始请求的相同实体。 这是我的代码: 在这个实现中,请求只是传递给我的普通rest控制器,而不会到达其他服务。我错