当前位置: 首页 > 知识库问答 >
问题:

Kafka自定义分区-每个分区用户数据自定义的使用者分配器

程和煦
2023-03-14

我正在实现一个自定义消费者的主题/分区分配在Kafka。为此,我将重写AbstractPartitionAssignor抽象类,该类又实现ConsumerPartitionAssignor接口。

作为自定义赋值器的一部分,我希望发送一个关于消费者订阅的每个主题的每个分区的单个(浮动)信息。

我知道可以通过重写ConsumerPartitionAssignor接口的默认方法ByteBuffer subscriptionUserData(Set topics) 向赋值器发送自定义数据

但是,问题是,从上面的方法签名中,我无法获得为使用者注册的每个主题分配给带下划线使用者的分区列表。

public static final class Subscription {
    private final List<String> topics;
    private final ByteBuffer userData;
    private final List<TopicPartition> ownedPartitions;
    private Optional<String> groupInstanceId;
    .....

谢谢你。

共有1个答案

汝跃
2023-03-14

要解决上述问题,只需使用赋值器回调函数onassignment,如下所示

...
private List<TopicPartition> memberAssignment = null;
     @Override
        public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
            memberAssignment = assignment.partitions();
            this.generation = metadata.generationId();
        }

并在函数subscriptionUserData中使用MemberAssignment(或Assignor类中的任何位置)获取当前分配给每个使用者的分区列表。

public ByteBuffer subscriptionUserData(Set<String> topics) 
 类似资料:
  • 我试图处理的用例如下: 我们有一个来自Kafka的数据流 所以举个例子: 让我们假设所有消息都是表示编码数据的字节数组 编码数据中具有特定值的所有消息都应由一个操作员处理 这样,当我们接收到与相同值对应的特定消息时,这些消息可以作为状态存储在操作符上(在分区器之后),并可用于丰富后续消息 问题: 自定义分区程序会对此有所帮助吗 如果不是,那么什么是一个好的解决方案 有人可以在Flink中为数据流共

  • 我使用confluent的kafka connect将数据传输到s3桶中。基于键进行理想的分区。因为现有的FieldPartitioner只适用于Avro模式记录,而不适用于一般的字符串化JSON文本。我想我应该写我自己的连接器。 课堂是这样的: 当我构建它并尝试运行kafka connect时,我得到了一个错误 从查看打包一个自定义Java'partitioner.class'插件为Kafka连

  • 我试图使用spring cloud stream绑定实现一个自定义的Kafka分区器。我只想对用户主题进行自定义分区,而不对公司主题进行任何操作(在本例中,Kafka将使用DefaultPartitioner)。 我的绑定配置: 我使用以下方式将消息发送到流中: 我的UserPartitioner类: 我最终收到以下异常: 编辑:根据文档,还尝试了以下步骤: User-Out:Destinatio

  • 我有自己的分区技术为数据流元组生成键,这些键的范围等于集群中的节点数,如果我将并行度设置为4,则生成的键将是0、1、2和3,以此类推,然后每个键都应该被分区到同一个节点,以便使用键状态进行更多的键化处理。 发生了什么:我使用keyBy实现了我的逻辑,所以我可以使用键控状态,但是它有很大的偏斜性,有些节点没有接收到记录,而其他节点接收到的记录不止一个。我尝试过使用自定义分区,它按照我想要的方式进行物

  • 我需要在std::字符串对象中使用一个已经分配的char*缓冲区(带有字符串内容)。经过一些研究,我发现这几乎是不可能的,std::字符串总是会有自己的私有数据副本。我能想到的唯一剩下的方法是使用一个自定义分配器,它将返回已经分配的char缓冲区的地址。要做到这一点,std::字符串应该只使用分配器来分配内存来保存它的字符串数据,而不是别的。是这样吗?

  • 我是测微计新手。我想记录诸如用户注册、用户登录等事件。我可以用测微计和Spring靴来完成这项工作,并在普罗米修斯/格拉法纳中显示数据吗?