我们有一个Apache Flink POC应用程序,它在本地运行良好,但部署到Kinesis Data Analytics(KDA)后,它不会将记录发送到接收器。
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);
consumer.setStartFromEarliest(); //Just for repeatable testing
return environment
.addSource(consumer)
.map(new MapJsonToTelemetry());
}
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
WatermarkStrategy<Telemetry> wmStrategy =
WatermarkStrategy
.<Telemetry>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.TimeStamp);
return telemetries
.assignTimestampsAndWatermarks(wmStrategy)
.keyBy(t -> t.StateIso)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new WindowCountFunction());
}
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
Map<String, Object> record = new HashMap<>();
record.put("stateIso", element.StateIso);
record.put("healthy", element.Flawless);
record.put("unhealthy", element.Faulty);
...
LOG.info("Telemetry has been added to the buffer");
indexer.add(Requests.indexRequest()
.index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.source(record, XContentType.JSON));
}
);
//Using low values to make sure that the Flush will happen
esSinkBuilder.setBulkFlushMaxActions(25);
esSinkBuilder.setBulkFlushInterval(1000);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
LOG.info("Sink has been attached to the DataStream");
telemetries.addSink(esSinkBuilder.build());
}
{
"locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
"logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
"message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
回应
json prettyprint-override">{
"locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
"logger": "org.elasticsearch.client.RestClient",
"message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "DEBUG"
}
,而(true)外部循环和内部循环
但没有区别
但没有区别
每当我们从IDEA运行应用程序时,这些日志都在控制台上
每当我们使用KDA运行应用程序时,CloudWatch中就没有这样的日志
- 添加到主日志中的那些日志确实会出现在CloudWatch日志中
我们假设在接收器端看不到数据,因为没有触发窗口处理逻辑。这就是为什么在CloudWatch中看不到处理日志的原因。
欢迎任何帮助!
更新#1
- 我们试图将Flink版本从1.12.1降级到1.11.1
- 它甚至在本地环境中不起作用
更新#2
平均消息大小约为4kb。以下是示例消息的摘录:
{
"affiliateCode": "...",
"appVersion": "1.1.14229",
"clientId": "guid",
"clientIpAddr": "...",
"clientOriginated": true,
"connectionType": "Cable/DSL",
"countryCode": "US",
"design": "...",
"device": "...",
...
"deviceSerialNumber": "...",
"dma": "UNKNOWN",
"eventSource": "...",
"firstRunTimestamp": 1609091112818,
"friendlyDeviceName": "Comcast",
"fullDevice": "Comcast ...",
"geoInfo": {
"continent": {
"code": "NA",
"geoname_id": 120
},
"country": {
"geoname_id": 123,
"iso_code": "US"
},
"location": {
"accuracy_radius": 100,
"latitude": 37.751,
"longitude": -97.822,
"time_zone": "America/Chicago"
},
"registered_country": {
"geoname_id": 123,
"iso_code": "US"
}
},
"height": 720,
"httpUserAgent": "Mozilla/...",
"isLoggedIn": true,
"launchCount": 19,
"model": "...",
"os": "Comcast...",
"osVersion": "...",
...
"platformTenantCode": "...",
"productCode": "...",
"requestOrigin": "https://....com",
"serverTimeUtc": 1617809474787,
"serviceCode": "...",
"serviceOriginated": false,
"sessionId": "guid",
"sessionSequence": 2,
"subtype": "...",
"tEventId": "...",
...
"tRegion": "us-east-1",
"timeZoneOffset": 5,
"timestamp": 1617809473305,
"traits": {
"isp": "Comcast Cable",
"organization": "..."
},
"type": "...",
"userId": "guid",
"version": "v1",
"width": 1280,
"xb3traceId": "guid"
}
我们正在使用ObjectMapper
来仅解析json的一些字段。以下是遥测
类的样子:
public class Telemetry {
public String AppVersion;
public String CountryCode;
public String ClientId;
public String DeviceSerialNumber;
public String EventSource;
public String SessionId;
public TelemetrySubTypes SubType; //enum
public String TRegion;
public Long TimeStamp;
public TelemetryTypes Type; //enum
public String StateIso;
...
}
更新#3
无数据
在与AWS人员进行了一次支持会议之后,我们发现我们没有在流媒体环境中设置时间特性。
在Flink 1.12中,默认的流时间特性已更改为EventTime,因此您不再需要调用此方法来启用事件时间支持。
因此,在我们明确设置了EventTime
之后,它开始像魅力一样生成水印:
streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
根据您提供的评论和更多信息,问题似乎是两个Flink消费者不能从同一个分区消费。因此,在您的情况下,只有一个运算符的并行实例将从kafka分区消费,而另一个将处于空闲状态。
通常Flink运算符会选择MIN([all_downstream_parallel_watermarks])
,所以在您的情况下,一个Kafka消费者会产生正常的水印,而另一个永远不会产生任何东西(在这种情况下,flink假设Long. Min
),所以Flink会选择较低的一个,即Long. Min
。所以,窗口永远不会被触发,因为当数据流动时,其中一个水印永远不会生成。良好的做法是在使用Kafka时使用与Kafka分区数量相同的并行性。
每当我想在heroku服务器上部署我的node js应用程序时,我都会遇到以下错误。我在互联网上到处寻找解决方案,但一个也没有。我也按照这里的建议增加了git缓冲区,但仍然得到了相同的错误
当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?
问题内容: 我正在尝试在Heroku上使用Flask开发我的第一个“大型”应用程序,并尝试将此处的基本教程与以下说明结合:https : //devcenter.heroku.com/articles/python与以下说明:http:// flask.pocoo.org/docs/patterns/packages/#larger- applications。它在本地与“先行启动”一起工作,但是
我正试图将我的spring应用程序部署到heroku,但我相信maven插件中存在一些错误。尝试了所有可能的版本组合,在本地工作,但在部署时不工作。 当我跑步的时候: git push heroku master 以下是错误: 4.0.0 org.springframework.Boot Spring-Boot-starter-parent 2.3.3.发布com.project techupda
我正在尝试将我的应用程序部署到Heroku,但似乎存在一些问题。每次我尝试: 我的heroku日志告诉我,我的应用程序崩溃了,我错过了“调试器”宝石。我在Heroku上找到了这条线索和这一页。当我尝试安装“byebug”gem时,bundle告诉我它不会安装在Ruby 1.9.3上,当我按照Heroku的建议将“debugger”gem放在gem文件的开发组中时,我仍然会收到相同的错误。 我也更新
我有一个vuejs应用程序,它是用vue cli设置的,我正在尝试将我的应用程序部署到Heroku。 这是我的服务器: 我从gitignore中删除dist, 我在package.json中添加了一个类似“start”的起始点:“node server.js” 以下是我在控制台看到的内容: 加载资源失败:服务器响应,状态为503(服务不可用)/Favicon.ico:1 以下是heroku日志: