当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中使用ListState进行BroadcastProcessFunction

凌蕴藉
2023-03-14

我们有一个包含事务的非键控数据流和一个包含规则的广播流。事实上,我们希望根据上次看到的规则处理事务。如果我们最后看到的规则是每日,我们必须将当前事务添加到每日事务列表中。此外,如果dailyTrnsList的大小大于阈值,则必须清除列表并将事务写入数据库。如果最后看到的规则是temp,我们也会做同样的事情。

代码如下:

public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();

private final static int threshold = 100;

private final MapStateDescriptor<String, String> ruleStateDesc =
        new MapStateDescriptor<>(
                "ControlMapState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

  @Override
  public void processElement(String s,
                           ReadOnlyContext readOnlyContext,
                           Collector<Transaction> collector) throws Exception
 {
    String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");

    if(ruleName.equals("daily"))
        {
            dailyTrnsList.add(s);
            if(dailyTrnsList.size()>=threshold)
                {
                    List<String> buffer = dailyTrnsList;
                    dailyTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"daily");
                }
        }
    else if(ruleName.equals("temp"))
        {
            tempTrnsList.add(s);
            if(tempTrnsList.size()>=threshold)
                {
                    List<String> buffer = tempTrnsList;
                    tempTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"temp");
                }
        }

    collector.collect(s);

   }
  @Override
  public void processBroadcastElement(String s,
                                    Context context,
                                    Collector<CardTransaction> collector) throws Exception
  {
    if (s.equals("temp"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "temp");
    List<String> buffer = dailyTrnsList;
        dailyTrnsList = new ArrayList<>();
        insert_to_db(buffer,"daily");
    }
    else if (s.equals("daily"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "daily");
        List<String> buffer = tempTrnsList;
        tempTrnsList = new ArrayList<>();
        insert_to_db(buffer,"temp");
      }
    }
  }

我们的问题是编写一种容错方法。我们不知道如何使用ListState来解决我们的问题。到目前为止,我们找到的唯一解决方案是实现CheckpointedFunction接口,该接口正在与Flink文档中的State部分一起工作。

private ListState<String> dailyTrns;
private ListState<String> tempTrns;

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    dailyTrns.clear();
    tempTrns.clear();
    for (String[] element : dailyTrnsList)
        dailyTrns.add(element);
    for (String[] element : tempTrnsList)
        tempTrns.add(element);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
    tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
    if (context.isRestored()) {
        for (String[] element : dailyTrns.get())
            dailyTrnsList.add(element);
        for (String[] element : tempTrns.get())
            tempTrnsList.add(element);
    }
}

请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案是正确的,那么对于没有从日常trnslist和testrnslist传输到日常trns和testrns的元素会发生什么情况?

任何帮助都将不胜感激。

提前谢谢你。

共有2个答案

孟翰海
2023-03-14
  1. 您不需要每日trns列表和试探列表,因为您已经有每日trns和试探列表。只需使用这些ListState变量来记录事务
  2. 我不知道你为什么有一个映射状态
姜鹏程
2023-03-14

您可以简化实现,以便不必担心这一点。您可以执行以下操作:

(1) 简化BroadcastProcessFunction,使其只需将传入流拆分为两个流:每日事务流和临时事务流。它通过根据最新规则从两个侧输出中选择一个来实现这一点。

(2)使用创建批次并将它们写入数据库的计数窗口遵循BroadcastProcessFunction。

或者不使用侧输出,BroadcastProcessFunction可以写出(规则、事务)的元组,然后您可以根据规则键入流。无论哪种方式,想法都是让窗口API为您管理容错列表。

 类似资料:
  • 如何在Flink SQL查询中使用SQL客户端进行窗口连接。窗口设置方式与下面链接中提到的方式相同https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html 需要窗口的示例查询:选择sourceKafka.*从sourceKafka内部连接SourceKafca上的bad

  • 我正在构建一个有以下要求的应用程序,我刚刚开始使用Flink。 null null 谢谢并感激你的帮助。

  • 我有一个这样的数据集 我想选择第3列和第4列作为我的键和值,我如何在Apache Flink中执行平均操作。 我最多能做到“按键分组”。但是我无法对每个键的值执行平均运算。

  • 在一个flink项目中,我使用一个case类click。 这个类填充了数据集,并且在日期为Java8的情况下可以很好地工作。在Java7环境中切换到org.joda(Version2.9)之后,对数据集中的click对象的调用不像以前那样执行。对click对象的date字段的某些函数的访问引发。这些函数的例子有等。我能够确保click类的日期字段不为空。我怀疑joda时间库与kryo序列化的交互不

  • 我正在学习Flink,我从使用DataStream的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到3个或更多单词的结果。 我想创建一个WindowFunction,根据找到的单词值对输出进行排序。我试图实现的WindowFunction根本不编译。我正在努力定义WindowFunction接口的apply方法和参数。