我的目标是使用Kafka作为源设置一个高吞吐量集群
我在主服务器和辅助服务器上设置了一个2节点集群,配置如下。
flink-conf.yaml大师
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Worker flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
主节点上的从属
文件如下所示:
<WORKER_IP_ADDR>
localhost
两个节点上的 flink 设置位于具有相同名称的文件夹中。我通过运行
bin/start-cluster-streaming.sh
这将启动Worker节点上的任务管理器。
我的输入源是Kafka。以下是片段。
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
这是我的水槽功能
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
这是我的pom.xml.中的Flink依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
然后,我在主服务器上用这个命令运行打包的jar
bin/flink run flink-test-jar-with-dependencies.jar
但是,当我将消息插入Kafka主题时,我能够在主节点上单独考虑来自Kafka话题的所有消息(通过我的SinkFunction
实现的invoke方法中的调试消息)。
在Flink中从Kafka源读取时,源任务的最大并行度受到给定Kafka主题的分区数的限制。Kafka分区是Flink中源任务可以使用的最小单位。如果分区多于源任务,则某些任务将消耗多个分区。
因此,为了向所有100个任务提供输入,您应该确保您的Kafka主题至少有100个分区。
如果您无法更改主题的分区数,那么也可以使用setParallelism
方法以较低的并行度从Kafka开始读取。或者,您可以使用重新平衡
方法,该方法将在前面操作的所有可用任务中洗牌数据。
我试图学习Akka集群下面的教程提供了这里 我已经创建了应用程序和回购是在这里。 正如教程中提到的,我已经启动了FrontEndApp 即使我在2551和2552上启动后端应用程序,上述警告消息也会不断重复。 在2551上启动后端参与者的终端日志。 最后一个日志持续重复。 在2552上启动后端参与者的终端日志。 不确定是什么原因群集节点不能检测到彼此和参与者节点与后端。 我会错过任何设置吗?
我正在研究Flink 1.9.1的docker/k8s部署可能性。 我看完了[1][2][3][4]。 目前,我们确实认为,我们将尝试采用工作集群方法,尽管我们想知道社区的这一趋势是什么?我们不希望每个Flink集群部署多个作业。 不管怎样,我想知道一些事情: > 在这两种情况下,Flink的UI都显示每个任务管理器有4个CPU。 如果使用作业群集,如何重新提交作业。我指的是这个用例。你可能会说我
我正在学习使用可拆分DOFN。我预计我的工作将分配给500名员工,但Dataflow只运行了1或2名员工。我是否错误地理解或实现了可拆分DoFn? 我的beam版本是2.16.0
下面是Broker.xml中artemis集群(3台服务器)的设置 我预计broker3在集群中就应该开始接收请求。
Centralized Workflow。项目的所有协作者把对项目的修改推送到统一的远程仓库,这就是集中式工作流。其它的 Git 工作流基本都是基于这种工作流程做了一些扩展。 项目的发起者在自己电脑上创建了一个本地仓库,他又为项目在远程创建了一个仓库,这个远程仓库就是所有协作者要把提交推送到的地方。这个远程仓库在谁家那创建都无所谓,可以用 Github,Coding.net,阿里云 Code,也可
在src目录下,我运行下面的命令 但得到以下错误。 创建集群[ERR]抱歉,无法连接到节点127.0.0.1:7000 但是,如果我使用命令“redis server redis.conf”在7000处启动节点,其中redis.conf在下面 端口7000群集已启用是群集配置文件nodes.conf群集节点超时10群集从属有效性系数0 appendonly是 同样,我成功地在所有端口启动了redi