当前位置: 首页 > 知识库问答 >
问题:

阿帕奇·Flink和阿帕奇脉冲星

仲君昊
2023-03-14

我正在使用Flink从Apache Pulsar读取数据。我在pulsar中有一个分区主题,有8个分区。在本主题中,我生成了1000条消息,分布在8个分区中。我的笔记本电脑中有8个内核,因此我有8个子任务(默认情况下,并行度=#个内核)。在执行Eclipse中的代码后,我打开了Flink UI,发现一些子任务没有收到任何记录(空闲)。我希望所有8个子任务都能得到利用(我希望每个子任务都映射到我的主题中的一个分区)。

重新启动作业后,我发现有时使用3个子任务,有时使用4个任务,而其余子任务保持空闲。

请您的支持来澄清这一情况。

还有,我怎么知道在分拍之间是否有洗牌呢?

我的代码:

ConsumerConfigurationData<String> consumerConfigurationData = new ConsumerConfigurationData<>();

Set<String> topicsSet = new HashSet<>();
topicsSet.add("flink-08");

consumerConfigurationData.setTopicNames(topicsSet);
consumerConfigurationData.setSubscriptionName("my-sub0111");
consumerConfigurationData.setSubscriptionType(SubscriptionType.Key_Shared);
consumerConfigurationData.setConsumerName("consumer-01");
consumerConfigurationData.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);

PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema()).pulsarAllConsumerConf(consumerConfigurationData).serviceUrl("pulsar://localhost:6650");

SourceFunction<String> src = builder.build();
DataStream<String> stream = env.addSource(src);

stream.print(" >>> ");

共有1个答案

崔宇
2023-03-14

关于脉冲星的问题,我知道的还不够多。我建议设置一个更大的测试,看看结果如何。通常,您的分区比插槽多,并且有些插槽以某种随机方式使用多个分区。

还有,我怎么知道在分拍之间是否有洗牌呢?

最简单的方法是查看FlinkWebUI的拓扑结构。在那里,您应该可以看到任务的数量和通道类型。如果你想要更多的细节,你可以发布一个截图,但在这种情况下,没有什么会被洗牌,因为你只有一个源和一个汇。

 类似资料:
  • Apache Kafka:分布式消息传递系统 Apache Storm:实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据? 在实时数据管道方面,我觉得两者做的工作是一样的。如何在数据管道上同时使用这两种技术?

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • 目前我正在研究Apache spark和Apache ignite框架。 这篇文章介绍了它们之间的一些原则差异,但我意识到我仍然不理解它们的目的。 我的意思是,哪一个问题更容易产生火花而不是点燃,反之亦然?

  • 我正在使用这两个实时数据流框架处理器。我找遍了所有的地方,但我找不到这两个框架之间有很大的区别。特别是,我想知道他们是如何工作的基础上的数据或拓扑等大小。

  • 我正在做一个学术项目,涉及传感器的流数据。我已经包围了苍鹭(Storm的接班人)和尼菲。两者都支持内置背压,这对我的项目至关重要。Apache Nifi和Heron之间的主要区别是什么? 哪款更适合物联网应用?

  • 我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦