当前位置: 首页 > 知识库问答 >
问题:

Kafka流--如何按两次分组?

濮阳功
2023-03-14

你是怎么做到的?

我在想Streams.GroupByKey()...然后按十六进制值groupBy但我需要将KTable转换为kstream...

更新

    null

无论我从哪一个主题阅读,我都会收到这样的消息{ImagePath:{HexCode:#FFF}}。图像路径是关键,hexCode是值。我可以有一个到多个imagePaths,所以我的想法是我的前端将有一个websocket来拾取它。它会显示一个图像,上面有一个条形图,上面有大量的像素颜色代码。例如有4#FFF、28#FEF等。

因此,我要按imagePath分组,然后计算imagePath的每个像素。

例如:

    null

共有1个答案

岳意蕴
2023-03-14

是否可以在分组前通过组合键进行选择?类似这样的事情:

SteamsBuilder topology = new StreamsBuilder();

topology.stream("input")
   .selectKey((k, v) -> k + v.hex)
   .groupByKey()
   .count()

这不是groupBy两次,而是得到你想要的效果。

评论后更新:

class Image {
    public String imagePath;
}

class ImageAggregation {
    public String imagePath;
    public int count;
}

class ImageSerde implements Serde<Image> {
    // implement
}

class ImageAggregationSerde implements Serde<ImageAggregation> {
    // implement   
}

KTable<String, ImageAggregation> table = topology
  .stream("input", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
  .groupBy((k, v) -> v.imagePath)
  .aggregate(ImageAggregation::new,
             (k, v, agg) -> {
                 agg.imagePath = v.imagePath;
                 agg.count = agg.count + 1;
                 return agg;
             }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageAggregationSerde());
class ImageHex {
    public String imagePath;
    public String hex;
}

class ImageHexAggregation {
    public String imagePath;
    public Map<String, Integer> counts;
}

class ImageHexSerde implements Serde<ImageHex> {
    // implement
}

class ImageHexAggregationSerde implements Serde<ImageHexAggregation> {
    // implement   
}

KTable<String, ImageHexAggregation> table = topology
  .stream("image-hex-observations", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
  .groupBy((k, v) -> v.imagePath)
  .aggregate(ImageHexAggregation::new,
             (k, v, agg) -> {
                 agg.imagePath = v.imagePath;
                 Integer currentCount = agg.counts.getOrDefault(v.hex, 0)
                 agg.counts.put(v.hex, currentCount + 1));
                 return agg;
             }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageHexAggregationSerde());
 类似资料:
  • 我试图连接两个Ktable流,似乎作为连接操作的一个输出,我两次得到与输出相同的消息。似乎在此操作过程中调用了两次值Joiner。 让我知道如何解决这个问题,以便只有一条消息作为加入操作的输出发出。 由于两个ktable(msg1和msg2)之间的连接,我收到两条相同的消息。

  • 我正在建立一个聊天网站,我有一个包含以下列的表,,,。 是聊天来源的用户的,而是聊天发送到的用户的。 现在我想获取所有聊天,但我也想分组,如果是我的id,它将按分组,如果是我的用户id,它将按分组,我尝试了这个分组,但它只是在和相同时才分组,我怎么也能让它反过来呢

  • 我正在用Scala编写一个Kafka Streams应用程序,我担心潜在的内存泄漏/总的资源使用。 是否有一种方法向Kafka发出信号,让它“关闭”分组/分支操作创建的特定子流,并释放相关资源? 为了演示潜在的问题,让我们考虑一个电子商务应用程序,它将订单状态更改事件推送到一个名为“my-super-input-topic”的Kafka主题。每个订单都由OrderId唯一标识,OrderId用作K

  • 问题内容: 昨天我发布了一个问题,关于必须按下两次按钮才能使其正常工作。我得到了很好的帮助,这是stackoverflow的标志,但是问题仍然存在。我将代码缩减到最低限度,问题仍然存在。我仔细阅读了BalusC的建议,希望能在表单内找到表单。我当然看不到任何东西,所以我将发布我的代码,以希望更多的眼睛看到一些东西。 我有一个模板,可以从“欢迎”(登录部分)中调用。这将转到具有命令按钮的userIn

  • 问题内容: 你如何两次读取同一输入流?是否可以某种方式复制它? 我需要从网络获取图像,将其保存在本地,然后返回保存的图像。我只是想,使用相同的流而不是为下载的内容启动新的流然后再次读取它会更快。 问题答案: 你可以用来将的内容复制到字节数组,然后使用从字节数组重复读取。例如:

  • 我试图使用kafka流库只使用一次kafka的功能。我只将proessing.guarantee配置为exactly_once。与此同时,需要将事务状态存储在内部主题(__transaction_state)中。 我的问题是,如何定制主题的名称?如果kafka集群由多个消费者共享,那么每个消费者是否需要不同的事务管理主题? 谢谢你,墨蒂