当前位置: 首页 > 知识库问答 >
问题:

Flink kafka - Flink 作业未向不同分区发送消息

国晟睿
2023-03-14

我有以下配置

  1. 一个具有 2 个分区的 kafka 主题
  2. 一个动物园管理员实例
  3. 一个 kafka 实例
  4. 具有相同组 ID 的两个使用者

Flink 作业片段:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new 
SimpleStringSchema(), props));

方案 1:

我在eclipse上写了一个flink job (Producer ),它从一个文件夹中读取一个文件,并在kafka主题上放置msgs。

所以当我使用eclipse运行这段代码时,它工作得很好。

例如:如果我放置一个有100条记录的文件,flink会向分区1发送一些msgs

场景2:当我创建上面代码的jar并在flink服务器上运行它时,flink将所有MSG发送到单个分区,因此只有一个使用者获得所有MSG。

我希望场景1使用场景2中创建的jar。

共有2个答案

弓晔
2023-03-14

如果您没有提供FlinkKafkaPartitioner或没有明确说明要使用Kafka的分区器,则将使用FlinkFixedPartitioner,这意味着来自一个任务的所有事件都将在同一分区中结束。

要使用Kafka的分区器,请使用此分区器:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), props), Optional.empty());

从IDE和eclipse运行之间的差异可能是因为Flink中并行性或分区的不同设置。

葛安和
2023-03-14

对于Flink-Kafka生成器,添加“null”作为最后一个参数。

speStream.addSink(new FlinkKafkaProducer011(
    kafkaTopicName,
    new SimpleStringSchema(),
    props,
    (FlinkKafkaPartitioner) null)
);

对此的简短解释是,这会关闭Flink使用默认分区程序Flink Fixed分区程序。作为默认值关闭这将允许Kafka在其认为合适的分区之间分发数据。如果未关闭此功能,则用于使用Flink KafkaProducer的接收器的每个并行性/任务槽将只写入每个并行性/任务槽中的一个分区。

 类似资料:
  • 大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置

  • 问题内容: 我有一个JTextArea在我的主应用程序窗口中始终可见(如果需要,则为Log),并且我想使用它来显示系统中正在进行的活动(如您将对System.out.println( )(如果有条件或其他条件) 我指的是用户所做的高级操作(例如“成功加载文件”或“写入磁盘”,“完成”等) 这样的消息可以在我的系统中的任何地方生成,主要是在另一个包中,这些包的类处理数据和计算,并且它们不知道GUI。

  • 主要内容:1 invokeOneway单向发送,1.1 invokeOnewayImpl单向调用,2 sendMessageSync同步发送,2.1 invokeSync同步调用,3 sendMessageAsync异步发送消息,3.1 invokeAsync异步调用,3.2 onExceptionImpl异常处理,4 NettyClientHandler处理服务端消息,4.1 processResponseCommand处理响应,基于RocketMQ release-4.9.3,深入的介绍了P

  • 我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!

  • 我有一个有状态的应用程序,它维护与用户的会话。此应用程序有 5 个实例。 以下是主题: 所有主题都有5个分区。 Topic1和topic2分别用于建立州商店和全球故事。这两个主题都使用用户名作为消息键。这些主题中的数据由应用程序实例本身生成。 现在,另一个应用程序使用与消息键相同的用户名向topic3生成数据。 我的期望是它将进入同一个分区,该分区由在其本地状态存储中拥有该用户的实例使用。这是对的

  • 我想使用firebase云消息将通知从我的应用程序发送到另一个应用程序。所以我使用这个方法retrieveFCMToken(forSenderID:senderid)来处理这个过程。我将以下代码添加到我的应用程序委托中: 这是我的应用程序代理: 我遵循这个场景:我有两个应用程序,分别是“A”和“B”。我想将通知从应用程序“A”发送到应用程序“B”。因此,我将应用程序A的发件人id放入应用程序B代理