我们有一个包含事务的非键控数据流和一个包含规则的广播流。事实上,我们希望根据上次看到的规则处理事务。如果我们最后看到的规则是每日,我们必须将当前事务添加到每日事务列表中。此外,如果dailyTrnsList的大小大于阈值,则必须清除列表并将事务写入数据库。如果最后看到的规则是temp,我们也会做同样的事情。
代码如下:
public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();
private final static int threshold = 100;
private final MapStateDescriptor<String, String> ruleStateDesc =
new MapStateDescriptor<>(
"ControlMapState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
@Override
public void processElement(String s,
ReadOnlyContext readOnlyContext,
Collector<Transaction> collector) throws Exception
{
String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");
if(ruleName.equals("daily"))
{
dailyTrnsList.add(s);
if(dailyTrnsList.size()>=threshold)
{
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
}
else if(ruleName.equals("temp"))
{
tempTrnsList.add(s);
if(tempTrnsList.size()>=threshold)
{
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
collector.collect(s);
}
@Override
public void processBroadcastElement(String s,
Context context,
Collector<CardTransaction> collector) throws Exception
{
if (s.equals("temp"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "temp");
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
else if (s.equals("daily"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "daily");
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
}
我们的问题是编写一种容错方法。我们不知道如何使用ListState来解决我们的问题。到目前为止,我们找到的唯一解决方案是实现CheckpointedFunction接口,该接口正在与Flink文档中的State部分一起工作。
private ListState<String> dailyTrns;
private ListState<String> tempTrns;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
dailyTrns.clear();
tempTrns.clear();
for (String[] element : dailyTrnsList)
dailyTrns.add(element);
for (String[] element : tempTrnsList)
tempTrns.add(element);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
if (context.isRestored()) {
for (String[] element : dailyTrns.get())
dailyTrnsList.add(element);
for (String[] element : tempTrns.get())
tempTrnsList.add(element);
}
}
请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案是正确的,那么对于没有从日常trnslist和testrnslist传输到日常trns和testrns的元素会发生什么情况?
任何帮助都将不胜感激。
提前谢谢你。
您可以简化实现,以便不必担心这一点。您可以执行以下操作:
(1) 简化BroadcastProcessFunction,使其只需将传入流拆分为两个流:每日事务流和临时事务流。它通过根据最新规则从两个侧输出中选择一个来实现这一点。
(2)使用创建批次并将它们写入数据库的计数窗口遵循BroadcastProcessFunction。
或者不使用侧输出,BroadcastProcessFunction可以写出(规则、事务)的元组,然后您可以根据规则键入流。无论哪种方式,想法都是让窗口API为您管理容错列表。
如何在Flink SQL查询中使用SQL客户端进行窗口连接。窗口设置方式与下面链接中提到的方式相同https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html 需要窗口的示例查询:选择sourceKafka.*从sourceKafka内部连接SourceKafca上的bad
我正在构建一个有以下要求的应用程序,我刚刚开始使用Flink。 null null 谢谢并感激你的帮助。
我有一个这样的数据集 我想选择第3列和第4列作为我的键和值,我如何在Apache Flink中执行平均操作。 我最多能做到“按键分组”。但是我无法对每个键的值执行平均运算。
我有一个
在一个flink项目中,我使用一个case类click。 这个类填充了数据集,并且在日期为Java8的情况下可以很好地工作。在Java7环境中切换到org.joda(Version2.9)之后,对数据集中的click对象的调用不像以前那样执行。对click对象的date字段的某些函数的访问引发。这些函数的例子有等。我能够确保click类的日期字段不为空。我怀疑joda时间库与kryo序列化的交互不
我正在学习Flink,我从使用DataStream的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到3个或更多单词的结果。 我想创建一个WindowFunction,根据找到的单词值对输出进行排序。我试图实现的WindowFunction根本不编译。我正在努力定义WindowFunction接口的apply方法和参数。