当前位置: 首页 > 知识库问答 >
问题:

当我们将Flink应用程序部署到Kinesis Data Analytics时,不会触发窗口

岳浩宕
2023-03-14

我们有一个Apache Flink POC应用程序,它在本地运行良好,但部署到Kinesis Data Analytics(KDA)后,它不会将记录发送到接收器。

  • 来源:Kafka 2.7
    • 1经纪人
    • 1个主题,分区为1,复制因子为1
    • 来源:亚马逊MSK Kafka 2.8
      • 3个经纪人(但我们正在连接一个)
      • 1个主题,分区为1,复制因子为3
      • 平行度:2
      1. FlinkKafkaConsumer从主题中读取json格式的消息
      2. JSON映射到域对象,称为遥测
      private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
          Properties kafkaProperties = new Properties();
          kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
          kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
      
          FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);
      
          consumer.setStartFromEarliest(); //Just for repeatable testing
      
          return environment
                  .addSource(consumer)
                  .map(new MapJsonToTelemetry());
      }
      
      private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
          WatermarkStrategy<Telemetry> wmStrategy =
                  WatermarkStrategy
                          .<Telemetry>forMonotonousTimestamps()
                          .withTimestampAssigner((event, timestamp) -> event.TimeStamp);
      
          return telemetries
                  .assignTimestampsAndWatermarks(wmStrategy)
                  .keyBy(t -> t.StateIso)
                  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                  .process(new WindowCountFunction());
      }
      
      private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
          List<HttpHost> httpHosts = new ArrayList<>();
          httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
      
          ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
                  httpHosts,
                  (ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
                      Map<String, Object> record = new HashMap<>();
      
                      record.put("stateIso", element.StateIso);
                      record.put("healthy", element.Flawless);
                      record.put("unhealthy", element.Faulty);
                      ...
      
                      LOG.info("Telemetry has been added to the buffer");
                      indexer.add(Requests.indexRequest()
                              .index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                              .source(record, XContentType.JSON));
                  }
          );
      
          //Using low values to make sure that the Flush will happen
          esSinkBuilder.setBulkFlushMaxActions(25);
          esSinkBuilder.setBulkFlushInterval(1000);
          esSinkBuilder.setBulkFlushMaxSizeMb(1);
          esSinkBuilder.setBulkFlushBackoff(true);
          esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
      
          LOG.info("Sink has been attached to the DataStream");
          telemetries.addSink(esSinkBuilder.build());
      }
      
      • 我们设法将Kafka、KDA和ElasticSearch放在相同的VPC和相同的子网下,以避免对每个请求进行签名
      • 从日志中我们可以看到Flink可以到达ES集群。
        请求
      {
          "locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
          "logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
          "message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
          "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
          "applicationARN": "arn:aws:kinesisanalytics:...",
          "applicationVersionId": "39",
          "messageSchemaVersion": "1",
          "messageType": "INFO"
      }
      

      回应

      json prettyprint-override">{
          "locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
          "logger": "org.elasticsearch.client.RestClient",
          "message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
          "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
          "applicationARN": "arn:aws:kinesisanalytics:...",
          "applicationVersionId": "39",
          "messageSchemaVersion": "1",
          "messageType": "DEBUG"
      }
      
      • 我们还可以通过查看Flink Dashboard来验证消息是否已从Kafka主题中读取并发送以供处理https://i.stack.imgur.com/aUZBm.png“alt=“任务之间发送和接收数据”>
      • 我们实现了一个RichParallelSourceFunction,它发出1\u 000\u 000条消息,然后退出
        • 这在当地环境中效果很好
        • 作业在AWS环境中完成,但接收器端没有数据
        • 基本上,我们有两个循环a,而(true)外部循环和内部循环
        • 但没有区别
        • 但没有区别
        • 每当我们从IDEA运行应用程序时,这些日志都在控制台上
        • 每当我们使用KDA运行应用程序时,CloudWatch中就没有这样的日志
          • 添加到主日志中的那些日志确实会出现在CloudWatch日志中

          我们假设在接收器端看不到数据,因为没有触发窗口处理逻辑。这就是为什么在CloudWatch中看不到处理日志的原因。

          欢迎任何帮助!

          更新#1

          • 我们试图将Flink版本从1.12.1降级到1.11.1
          • 它甚至在本地环境中不起作用

          更新#2

          平均消息大小约为4kb。以下是示例消息的摘录:

          {
            "affiliateCode": "...",
            "appVersion": "1.1.14229",
            "clientId": "guid",
            "clientIpAddr": "...",
            "clientOriginated": true,
            "connectionType": "Cable/DSL",
            "countryCode": "US",
            "design": "...",
            "device": "...",
            ...
            "deviceSerialNumber": "...",
            "dma": "UNKNOWN",
            "eventSource": "...",
            "firstRunTimestamp": 1609091112818,
            "friendlyDeviceName": "Comcast",
            "fullDevice": "Comcast ...",
            "geoInfo": {
              "continent": {
                "code": "NA",
                "geoname_id": 120
              },
              "country": {
                "geoname_id": 123,
                "iso_code": "US"
              },
              "location": {
                "accuracy_radius": 100,
                "latitude": 37.751,
                "longitude": -97.822,
                "time_zone": "America/Chicago"
              },
              "registered_country": {
                "geoname_id": 123,
                "iso_code": "US"
              }
            },
            "height": 720,
            "httpUserAgent": "Mozilla/...",
            "isLoggedIn": true,
            "launchCount": 19,
            "model": "...",
            "os": "Comcast...",
            "osVersion": "...",
            ...
            "platformTenantCode": "...",
            "productCode": "...",
            "requestOrigin": "https://....com",
            "serverTimeUtc": 1617809474787,
            "serviceCode": "...",
            "serviceOriginated": false,
            "sessionId": "guid",
            "sessionSequence": 2,
            "subtype": "...",
            "tEventId": "...",
            ...
            "tRegion": "us-east-1",
            "timeZoneOffset": 5,
            "timestamp": 1617809473305,
            "traits": {
              "isp": "Comcast Cable",
              "organization": "..."
            },
            "type": "...",
            "userId": "guid",
            "version": "v1",
            "width": 1280,
            "xb3traceId": "guid"
          }
          

          我们正在使用ObjectMapper来仅解析json的一些字段。以下是遥测类的样子:

          public class Telemetry {
              public String AppVersion;
              public String CountryCode;
              public String ClientId;
              public String DeviceSerialNumber;
              public String EventSource;
              public String SessionId;
              public TelemetrySubTypes SubType; //enum
              public String TRegion;
              public Long TimeStamp;
              public TelemetryTypes Type; //enum
              public String StateIso;
              
              ...
          }
          

          更新#3

          无数据


共有2个答案

史烈
2023-03-14

在与AWS人员进行了一次支持会议之后,我们发现我们没有在流媒体环境中设置时间特性。

  • 在1.11.1中,时间特性的默认值是摄取时间

在Flink 1.12中,默认的流时间特性已更改为EventTime,因此您不再需要调用此方法来启用事件时间支持。

因此,在我们明确设置了EventTime之后,它开始像魅力一样生成水印:

streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
长孙阳泽
2023-03-14

根据您提供的评论和更多信息,问题似乎是两个Flink消费者不能从同一个分区消费。因此,在您的情况下,只有一个运算符的并行实例将从kafka分区消费,而另一个将处于空闲状态。

通常Flink运算符会选择MIN([all_downstream_parallel_watermarks]),所以在您的情况下,一个Kafka消费者会产生正常的水印,而另一个永远不会产生任何东西(在这种情况下,flink假设Long. Min),所以Flink会选择较低的一个,即Long. Min。所以,窗口永远不会被触发,因为当数据流动时,其中一个水印永远不会生成。良好的做法是在使用Kafka时使用与Kafka分区数量相同的并行性。

 类似资料:
  • 每当我想在heroku服务器上部署我的node js应用程序时,我都会遇到以下错误。我在互联网上到处寻找解决方案,但一个也没有。我也按照这里的建议增加了git缓冲区,但仍然得到了相同的错误

  • 当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?

  • 问题内容: 我正在尝试在Heroku上使用Flask开发我的第一个“大型”应用程序,并尝试将此处的基本教程与以下说明结合:https : //devcenter.heroku.com/articles/python与以下说明:http:// flask.pocoo.org/docs/patterns/packages/#larger- applications。它在本地与“先行启动”一起工作,但是

  • 我正试图将我的spring应用程序部署到heroku,但我相信maven插件中存在一些错误。尝试了所有可能的版本组合,在本地工作,但在部署时不工作。 当我跑步的时候: git push heroku master 以下是错误: 4.0.0 org.springframework.Boot Spring-Boot-starter-parent 2.3.3.发布com.project techupda

  • 我正在尝试将我的应用程序部署到Heroku,但似乎存在一些问题。每次我尝试: 我的heroku日志告诉我,我的应用程序崩溃了,我错过了“调试器”宝石。我在Heroku上找到了这条线索和这一页。当我尝试安装“byebug”gem时,bundle告诉我它不会安装在Ruby 1.9.3上,当我按照Heroku的建议将“debugger”gem放在gem文件的开发组中时,我仍然会收到相同的错误。 我也更新

  • 我有一个vuejs应用程序,它是用vue cli设置的,我正在尝试将我的应用程序部署到Heroku。 这是我的服务器: 我从gitignore中删除dist, 我在package.json中添加了一个类似“start”的起始点:“node server.js” 以下是我在控制台看到的内容: 加载资源失败:服务器响应,状态为503(服务不可用)/Favicon.ico:1 以下是heroku日志: