当前位置: 首页 > 知识库问答 >
问题:

如何使用带有特定分区器的Apache Flink将数据作为键/值发送给Kafka

慕烨烁
2023-03-14

我在Flink有一个载荷,如下所示;

{
    "memberId": 4
    "total": 5
}

我想用指定的分区器将数据作为键值格式发送给kafka。对于分区器,我将使用模分区器。

模分配器示例;

分区ID=值%num分区

让我们假设参数为3。如果我们可以使用上面定义的有效负载的memberId,那么partitionId应该是4%3

根据上面的分区器,我想将具有相同分区ID的数据发送到相同的Kafka主题。另一个例子;

如果(假设num分区=3);

memberId: 3 => (3 % 3) => partitionId = 0 => kafka partition 1
memberId: 8 => (8 % 3) => partitionId = 2 => kafka partition 2
memberId: 2 => (2 % 3) => partitionId = 2 => kafka partition 2
memberId: 6 => (6 % 3) => partitionId = 0 => kafka partition 1
memberId: 7 => (7 % 3) => partitionId = 1 => kafka partition 2

如果我没说错的话,如果我们不能指定任何键和分区函数,FlinkKafka制作人就会使用FlinkFixedPartitioner。如果我们将配分函数设置为null,flink kafka生产者将使用循环分布。但我不知道如何将数据作为键/值格式发送给kafka,如何按模对其进行分区。我如何才能做到这一点?

共有1个答案

劳彦
2023-03-14

如果使用Kafka序列化模式,则可以创建Kafka生产记录,并设置Kafka键(和值)。您还可以在生产记录中设置分区。

 类似资料:
  • 如何将带有'\'符号的值作为get参数传递给控制器? 我期待:马自达\6

  • 通过这样创建xhr post请求: 这个post请求按预期发送到服务器,但是当我想访问QueryDict时,我不能访问通过以下方式传递的数据: 总是,如下所示。

  • 我对Kafka是新的,所以道歉,如果我听起来很愚蠢,但我目前所理解的是…消息流可以定义为主题,就像类别一样。并且每个主题被分成一个或多个分区(每个分区可以有多个副本)。所以它们是平行的 他们说Kafka的主要网站 生成器能够选择将哪个消息分配给主题中的哪个分区。这可以通过循环的方式简单地平衡负载,也可以根据某个语义分区函数(例如基于消息中的某个键)来完成。 在0.8 beta版中创建produce

  • 我相信,当当前用户访问经过身份验证的URL时,可以获取他们的信息,但是为每个登录用户存储所有接收器的方法是什么呢? 我很感激。

  • 我如何发送表单数据作为‘文本’通过放心,请参阅截图。当我使用request.multipart(“key”.“value”)时,请求是作为文件发送的(参考屏幕截图)。蒂娅。

  • 问题内容: 我想知道是否有可能使用AngularStrap的datepicker而不保留用户的语言环境的时区信息。在我们的应用程序中,我们要处理具有到期日期的Contract对象。 添加或编辑合同对象时,有一个日期选择器字段用于选择日期。发生以下情况: 用户选择日期(例如2013-10-24) Angular将javascript日期对象绑定到ng-model字段 绑定的日期对象位于用户的时区(例