当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中测试keyedbroadcastprocessfunction?

岳昊空
2023-03-14

我是刚接触flink的,我正在尝试编写junit测试用例来测试KeyedBroadcastProcessFunction。下面是我的代码,我当前正在调用TestUtils类中的getDataStreamOutput方法,并在输入数据根据模式规则列表求值后将inputdata和patternrules传递给方法,如果输入数据满足条件,我将获得信号并调用sink函数,并在getDataStreamOutput方法中以字符串形式返回输出数据

 @Test
    public void testCompareInputAndOutputDataForInputSignal() throws Exception {
        Assertions.assertEquals(sampleInputSignal,
                TestUtils.getDataStreamOutput(
                        inputSignal,
                        patternRules));
    }



public static String getDataStreamOutput(JSONObject input, Map<String, String> patternRules) throws Exception {

            env.setParallelism(1);

            DataStream<JSONObject> inputSignal = env.fromElements(input);

            DataStream<Map<String, String>> rawPatternStream =
                    env.fromElements(patternRules);

            //Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
            DataStream<Tuple2<String, Map<String, String>>> patternRuleStream =
                    rawPatternStream.flatMap(new FlatMapFunction<Map<String, String>,
                            Tuple2<String, Map<String, String>>>() {
                        @Override
                        public void flatMap(Map<String, String> patternRules,
                                            Collector<Tuple2<String, Map<String, String>>> out) throws Exception {
                            for (Map.Entry<String, String> stringEntry : patternRules.entrySet()) {
                                JSONObject jsonObject = new JSONObject(stringEntry.getValue());
                                Map<String, String> map = new HashMap<>();
                                for (String key : jsonObject.keySet()) {
                                    String value = jsonObject.get(key).toString();
                                    map.put(key, value);
                                }
                                out.collect(new Tuple2<>(stringEntry.getKey(), map));
                            }
                        }
                    });

            BroadcastStream<Tuple2<String, Map<String, String>>> patternRuleBroadcast =
                    patternStream.broadcast(patternRuleDescriptor);


            DataStream<Tuple2<String, JSONObject>> validSignal = inputSignal.map(new MapFunction<JSONObject,
                    Tuple2<String, JSONObject>>() {
                @Override
                public Tuple2<String, JSONObject> map(JSONObject inputSignal) throws Exception {
                    String source =
                            inputSignal.getSource();
                    return new Tuple2<>(source, inputSignal);
                }
            }).keyBy(0).connect(patternRuleBroadcast).process(new MyKeyedBroadCastProcessFunction());
            
            
             validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
                    JSONObject>() {
                @Override
                public JSONObject map(Tuple2<String, JSONObject> inputSignal) throws Exception {
                    return inputSignal.f1;
                }
            }).addSink(new getDataStreamOutput());

            env.execute("TestFlink");
        }
        return (getDataStreamOutput.dataStreamOutput);
    }


    @SuppressWarnings("serial")
    public static final class getDataStreamOutput implements SinkFunction<JSONObject> {
        public static String dataStreamOutput;

        public void invoke(JSONObject inputSignal) throws Exception {
            dataStreamOutput = inputSignal.toString();
        }
    }

我需要用相同的广播规则测试不同的输入,但每次我调用这个函数时,它一遍又一遍地做过程从一开始输入信号广播数据,有没有一种方法,我可以广播一次,并继续将输入发送到我探索的方法,我可以使用类似下面的CoFlatMapFunction的东西来组合数据流,并在方法运行时继续发送输入规则,但对于这个数据流必须继续从kafka主题获取数据,它将重压方法,以加载kafka实用程序和服务器

 DataStream<JSONObject> inputSignalFromKafka = env.addSource(inputSignalKafka);

    DataStream<org.json.JSONObject> inputSignalFromMethod = env.fromElements(inputSignal));
    
    DataStream<JSONObject> inputSignal = inputSignalFromMethod.connect(inputSignalFromKafka)
                .flatMap(new SignalCoFlatMapper());


   public static class SignalCoFlatMapper
            implements CoFlatMapFunction<JSONObject, JSONObject, JSONObject> {

        @Override
        public void flatMap1(JSONObject inputValue, Collector<JSONObject> out) throws Exception {
            out.collect(inputValue);

        }

        @Override
        public void flatMap2(JSONObject kafkaValue, Collector<JSONObject> out) throws Exception {
            out.collect(kafkaValue);

        }
    }

我在stackoverflow中找到了一个链接,当processElement依赖于广播数据时,如何在flink中单元测试BroadcastProcessFunction,但这让我很困惑

在测试用例中,我只能在Before方法中广播一次,并不断向我的广播函数发送不同类型的数据

共有1个答案

慕容聪
2023-03-14

您可以使用KeyedTwoInputStreamOperatorTestHarness来实现这一点。例如,我们假设您有以下KeyedBroadcastProcessFunction,其中您为两个Datastream通道定义了一些业务逻辑

public class SimpleKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, String, String, String> {
    @Override
    public void processElement(String inputEntry,
                               ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
    //business logic for how you want to process your data stream records
    }

  @Override
    public void processBroadcastElement(String broadcastInput, Context
            context, Collector<String> collector) throws Exception {
   //process input from your broadcast channel
}

现在,假设您的process函数是有状态的,并且正在修改Flink内部状态,您必须在测试类中创建testharness,以确保在测试期间能够跟踪状态。

然后,我将使用以下方法创建一些单元测试:

public class SimpleKeyedBroadcastProcessFunctionTest {
    private SimpleKeyedBroadcastProcessFunction processFunction;
    private KeyedTwoInputStreamOperatorTestHarness<String, String, String, String> testHarness;

  @Before
  public void setup() throws Exception {
    processFunction =  new SimpleKeyedBroadcastProcessFunction();
    testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
                new CoBroadcastWithKeyedOperator<>(processFunction, ImmutableList.of(BROADCAST_MAP_STATE_DESCRIPTOR)),
                (KeySelector<String, String>) string -> string ,
                (KeySelector<String, String>) string -> string,
                TypeInformation.of(String.class));
   testHarness.setup();
   testHarness.open();
  }

  @After
    public void cleanup() throws Exception {
        testHarness.close();
    }

  @Test
  public void testProcessRegularInput() throws Exception {
      //processElement1 send elements into your regular stream, second param will be the event time of the record
      testHarness.processElement1(new StreamRecord<>("Hello", 0));
      //Access records collected during processElement  
      List<StreamRecord<? extends String>> records = testHarness.extractOutputStreamRecords();
      assertEquals("Hello", records.get(0).getValue())
  }

    @Test
  public void testProcessBroadcastInput() throws Exception {
      //processElement2 send elements into your broadcast stream, second param will be the event time of the record
      testHarness.processElement2(new StreamRecord<>("Hello from Broadcast", 0));
      //Access records collected during processElement  
      List<StreamRecord<? extends String>> records = testHarness.extractOutputStreamRecords();
      assertEquals("Hello from Broadcast", records.get(0).getValue())
  }
}
 类似资料:
  • 我试图用Jasmine为Angularjs编写单元测试。这是我的控制器: 和测试 测试失败,即使我试图测试期望(true). toBe(true); 茉莉花,因果报应,棱角分明的嘲弄都在我的索引里。jasmine调试页面中的html,还有测试脚本。 我发现如果删除beforeach()块,expect(true)。托比(真的)通过了。 下面是一个错误:

  • 在Android Espresso测试中有什么好的方法来测试结果代码和数据吗?我在用浓缩咖啡2.0。 定义一个新方法,如并使用该方法以便可以截获,等等。 编写一个只用于测试的TestActivity,它将调用上的,并在中检查结果 试着思考这两个坏处中什么是较小的,或者是否有任何其他关于如何测试这一点的建议。有什么建议吗?谢了!

  • 我一直致力于扩展Apache Flink Python API,以更好地匹配Java API,但我在处理的数据类型方面遇到了奇怪的错误。是否有一种方法可以附加Java调试器(例如Intellij IDEA)来调试Flink本身?

  • 问题内容: 最近,我希望为golang编写单元测试。功能如下。 那么,如何测试“ func Display”的结果是“ hello world”呢? 问题答案: 您只需输入自己的值,然后测试写入其中的内容是否符合您的期望。这样做是一个不错的选择,因为它只是将输出存储在其缓冲区中。

  • 问题内容: 我有一个有时返回NoneType值的方法。那么我如何质疑一个无类型的变量呢?例如,我需要使用 if 方法 我知道这是错误的方式,希望您能理解我的意思。 问题答案: 那么我如何质疑一个无类型的变量呢? 像这样使用运算符 为什么这样有效? 由于是Python中唯一的单例对象,因此我们可以使用operator来检查其中是否包含变量。 引用文档, 运算符和对象标识测试:当且仅当和是相同对象时为

  • 我正在学习JUnit测试。我想在JUnit中测试getter和setter,但它不起作用。这是第一节课。 我需要如何更改以测试setter?我如何测试它?