当前位置: 首页 > 知识库问答 >
问题:

会话窗口闪烁

嵇浩然
2023-03-14

有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的?

例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。

如果我说的不对,请纠正。

上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果?

DTO:

public class IncomingEvent{
    private String id;
    private String eventId;
    private Date timestamp;
    private String component;
    //getters and setters
}
public class FinalOutPutEvent{
    private String id;
    private long timeTaken;
    //getters and setters
}

==========================================================传入事件的反序列化:

公共类IncomingEventDeserializationScheme实现KafkaDeserializationSchema{

private ObjectMapper mapper;

public IncomingEventDeserializationScheme(ObjectMapper mapper) {
    this.mapper = mapper;
}

@Override
public TypeInformation<IncomingEvent> getProducedType() {
    return TypeInformation.of(IncomingEvent.class);
}

@Override
public boolean isEndOfStream(IncomingEvent nextElement) {
    return false;
}

@Override
public IncomingEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    if (record.value() == null) {
        return null;
    }
    try {
        IncomingEvent event = mapper.readValue(record.value(), IncomingEvent.class);
        if(event != null) {
            new SessionWindow(record.timestamp());
            event.setOffset(record.offset());
            event.setTopic(record.topic());
            event.setPartition(record.partition());
            event.setBrokerTimestamp(record.timestamp());
        }
        return event;
    } catch (Exception e) {
        return null;
    }
}
     public class MyEventJob {

private static final ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    MyEventJob eventJob = new MyEventJob();

    InputStream inStream = eventJob.getFileFromResources("myConfig.properties");
    ParameterTool parameter = ParameterTool.fromPropertiesFile(inStream);
    Properties properties = parameter.getProperties();
    Integer timePeriodBetweenEvents = 120;
    String outWardTopicHostedOnServer = localhost:9092";
    DataStreamSource<IncomingEvent> stream = env.addSource(new FlinkKafkaConsumer<>("my-input-topic", new IncomingEventDeserializationScheme(mapper), properties));
    SingleOutputStreamOperator<IncomingEvent> filteredStream = stream
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<IncomingEvent>() {
            long eventTime;
            @Override
            public long extractTimestamp(IncomingEvent element, long previousElementTimestamp) {
                return element.getTimestamp();
            }
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(eventTime); 
            }
        })
        .map(e -> { e.setId(e.getEventId()); return e; });
    SingleOutputStreamOperator<FinalOutPutEvent> correlatedStream = filteredStream
        .keyBy(new KeySelector<IncomingEvent, String> (){
            @Override
            public String getKey(@Nonnull IncomingEvent input) throws Exception {
                return input.getId();
            }
        })
        .window(GlobalWindows.create()).allowedLateness(Time.seconds(defaultSliceTimePeriod))
        .trigger( new Trigger<IncomingEvent, Window> (){
            private final long sessionTimeOut;
            public SessionTrigger(long sessionTimeOut) {
                this.sessionTimeOut = sessionTimeOut;
            }
            @Override
            public TriggerResult onElement(IncomingEvent element, long timestamp, Window window, TriggerContext ctx)
                    throws Exception {
                ctx.registerProcessingTimeTimer(timestamp + sessionTimeOut); 
                return TriggerResult.CONTINUE;
            }
            @Override
            public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
                return TriggerResult.FIRE_AND_PURGE;
            }
            @Override
            public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
                    return TriggerResult.CONTINUE;
            }
            @Override
            public void clear(Window window, TriggerContext ctx) throws Exception {
                //check the clear method implementation
            }
        })
        .process(new ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>() {
        @Override
        public void process(String arg0,
                ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>.Context arg1,
                Iterable<IncomingEvent> input, Collector<FinalOutPutEvent> out) throws Exception {
            List<IncomingEvent> eventsIn = new ArrayList<>();
            input.forEach(eventsIn::add);
            if(eventsIn.size() == 1) {
                //Logic to handle incomplete request/response events
            } else if (eventsIn.size() == 2) {
                //Logic to handle the complete request/response and how much time it took
            }
        }
    } );
        FlinkKafkaProducer<FinalOutPutEvent> kafkaProducer = new FlinkKafkaProducer<>(
                outWardTopicHostedOnServer,            // broker list
                "target-topic",            // target topic
                new EventSerializationScheme(mapper));
    correlatedStream.addSink(kafkaProducer);
    env.execute("Streaming");
}

}

谢谢Vicky

共有1个答案

章锦
2023-03-14

从您的描述来看,我认为您希望编写一个自定义的ProcessFunction,它由session_id键控。您将有一个valueState,在其中存储请求事件的时间戳。当您获得相应的响应事件时,您会计算增量并发出该值(使用session_id)并清除出状态。

您可能还希望在获得请求事件时设置一个计时器,这样,如果您没有在安全/长时间内获得响应事件,则可以发出失败请求的侧输出。

 类似资料:
  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我们正在使用Kafka流的会话窗口来聚合相关事件的到达。除了聚合之外,我们还使用API指定窗口的保留时间。流信息: 会话窗口(非活动时间)为1分钟,传递到的保留时间为2分钟。我们使用定制的来映射事件的时间。 示例: 事件:e1;事件时间:上午10:00:00;到达时间:下午2点(同一天) 事件:e2;事件时间:上午10:00:30;到达时间:下午2:10(同一天) 第二个事件的到达时间是e1到达后

  • 我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?

  • 作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数

  • 2)我研究了循环分区的重新平衡。假设我建立了一个集群,如果我的源的并行度为1,如果我进行了重新平衡,我的数据是否会在机器之间进行重排以提高性能?如果是这样,是否有一个特定的端口将数据传送到集群中的其他节点? 3)状态维护有什么限制吗?我计划维护一些用户id相关的数据,这些数据可能会变得很大。我读到flink使用rocks db来维护状态。只是想检查一下是否有限制可以维护多少数据? 4)同样,如果数

  • Flink中的会话窗口在prod env上没有按预期工作(相同的逻辑在本地env上工作)。这个想法是为特定的用户ID发出“sample_event_two”的计数 尽管集合中存在sample_event_one(通过验证日志消息“已接收sample_event_one”是否存在来确认)并且计数计算正确,但我没有看到任何输出事件被创建。我看到日志消息“未找到 sampleOneEvent 事件,而不