当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka根据消息的值对窗口消息进行排序

张通
2023-03-14

我的Kafka publisher发送以下格式的字符串消息:{system_timestamp}-{event_name}?{parameters}

例如:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

另外,我们为每个消息添加一些消息键,将它们发送到相应的分区。

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

我如何在1分钟窗口中重新排序消息并将它们发送到另一个主题?

共有1个答案

夹谷硕
2023-03-14

这里有一个提纲:

创建一个处理器实现,该处理器实现:

>

  • inprocess()方法,对于每个消息:

      null
      null

  •  类似资料:
    • 在前两章,程序使用了同一个函数MessageBox来向使用者输出文字。MessageBox函数会建立一个「窗口」。在Windows中,「窗口」一词有确切的含义。一个窗口就是屏幕上的一个矩形区域,它接收使用者的输入并以文字或图形的格式显示输出内容。 MessageBox函数建立一个窗口,但这只是一个功能有限的特殊窗口。消息窗口有一个带关闭按钮的标题列、一个选项图标、一行或多行文字,以及最多四个按钮。

    • 本文向大家介绍C#向无窗口的进程发送消息,包括了C#向无窗口的进程发送消息的使用技巧和注意事项,需要的朋友参考一下 注:本文适用.net2.0+的winform程序 一个winform程序,我希望它不能多开,那么在用户启动第二个实例的时候,作为第二个实例来说,大概可以有这么几种做法: 1.弹个窗告知用户【程序已运行】之类,用户点击弹窗后,退出自身 2.什么都不做,默默退出自身 3.让已运行的第一个

    • 问题内容: 我想在ubuntu上运行的python脚本中显示一个信息窗口。我正在使用以下代码: 这可以工作,但是会显示一个空窗口,顶部是消息框。我如何摆脱窗口而只将消息框居中放在屏幕上(窗口管理器是gnome 2)? 这只是为了显示命令行脚本中的一些信息(密码,这就是为什么我不想仅将其回显到控制台)。 问题答案: Tkinter必须具有根窗口。如果您不创建一个,则会为您创建一个。如果您不需要此根窗

    • 问题内容: 我有一个Java程序,其中包含一个从JFrame继承的 Application 类。 我想显示一条消息,询问用户单击窗口右上方的X按钮是否要退出程序。 到目前为止,这是我的代码: 我从网上找到的教程中获得了此代码。我自己编写了WindowClosing事件处理程序的代码。但是,我在注册窗口侦听器(addWindowListener)时遇到麻烦。它告诉我WindowAdapter是抽象的

    • 我有一个Java程序,其中包含一个从JFrame继承的类Application。 我想显示一条消息,询问用户单击窗口右上角的X按钮是否要退出程序。 这是我目前的代码: 我从网上找到的教程中得到了这段代码。我自己编写了WindowClosing事件处理程序。但是,我在注册窗口侦听器(addWindowListener)时遇到问题。它告诉我WindowAdapter是抽象的,不能实例化。 请问我怎样才