当前位置: 首页 > 知识库问答 >
问题:

集群中的Apache Flink流不会与工人拆分作业

时向文
2023-03-14

我的目标是使用Kafka作为源设置一个高吞吐量集群

我在主服务器和辅助服务器上设置了一个2节点集群,配置如下。

flink-conf.yaml大师

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

Worker flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

主节点上的从属文件如下所示:

<WORKER_IP_ADDR>
localhost

两个节点上的 flink 设置位于具有相同名称的文件夹中。我通过运行

bin/start-cluster-streaming.sh

这将启动Worker节点上的任务管理器。

我的输入源是Kafka。以下是片段。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");

这是我的水槽功能

public class MySink implements SinkFunction<String> {

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception {
        processMessage(arg0);
        System.out.println("Processed Message");
    }
}

这是我的pom.xml.中的Flink依赖项

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>

然后,我在主服务器上用这个命令运行打包的jar

bin/flink run flink-test-jar-with-dependencies.jar

但是,当我将消息插入Kafka主题时,我能够在主节点上单独考虑来自Kafka话题的所有消息(通过我的SinkFunction实现的invoke方法中的调试消息)。

  1. 为什么工作线程节点没有获取任务?
  2. 我是否缺少某些配置?

共有1个答案

燕钟展
2023-03-14

在Flink中从Kafka源读取时,源任务的最大并行度受到给定Kafka主题的分区数的限制。Kafka分区是Flink中源任务可以使用的最小单位。如果分区多于源任务,则某些任务将消耗多个分区。

因此,为了向所有100个任务提供输入,您应该确保您的Kafka主题至少有100个分区。

如果您无法更改主题的分区数,那么也可以使用setParallelism方法以较低的并行度从Kafka开始读取。或者,您可以使用重新平衡方法,该方法将在前面操作的所有可用任务中洗牌数据

 类似资料:
  • 我试图学习Akka集群下面的教程提供了这里 我已经创建了应用程序和回购是在这里。 正如教程中提到的,我已经启动了FrontEndApp 即使我在2551和2552上启动后端应用程序,上述警告消息也会不断重复。 在2551上启动后端参与者的终端日志。 最后一个日志持续重复。 在2552上启动后端参与者的终端日志。 不确定是什么原因群集节点不能检测到彼此和参与者节点与后端。 我会错过任何设置吗?

  • 我正在研究Flink 1.9.1的docker/k8s部署可能性。 我看完了[1][2][3][4]。 目前,我们确实认为,我们将尝试采用工作集群方法,尽管我们想知道社区的这一趋势是什么?我们不希望每个Flink集群部署多个作业。 不管怎样,我想知道一些事情: > 在这两种情况下,Flink的UI都显示每个任务管理器有4个CPU。 如果使用作业群集,如何重新提交作业。我指的是这个用例。你可能会说我

  • 我正在学习使用可拆分DOFN。我预计我的工作将分配给500名员工,但Dataflow只运行了1或2名员工。我是否错误地理解或实现了可拆分DoFn? 我的beam版本是2.16.0

  • 下面是Broker.xml中artemis集群(3台服务器)的设置 我预计broker3在集群中就应该开始接收请求。

  • Centralized Workflow。项目的所有协作者把对项目的修改推送到统一的远程仓库,这就是集中式工作流。其它的 Git 工作流基本都是基于这种工作流程做了一些扩展。 项目的发起者在自己电脑上创建了一个本地仓库,他又为项目在远程创建了一个仓库,这个远程仓库就是所有协作者要把提交推送到的地方。这个远程仓库在谁家那创建都无所谓,可以用 Github,Coding.net,阿里云 Code,也可

  • 在src目录下,我运行下面的命令 但得到以下错误。 创建集群[ERR]抱歉,无法连接到节点127.0.0.1:7000 但是,如果我使用命令“redis server redis.conf”在7000处启动节点,其中redis.conf在下面 端口7000群集已启用是群集配置文件nodes.conf群集节点超时10群集从属有效性系数0 appendonly是 同样,我成功地在所有端口启动了redi