当前位置: 首页 > 知识库问答 >
问题:

在kafka集群节点间分配数据套接字

西门品
2023-03-14
public class WordCount {

   public static void main(String[] args) throws Exception {

    kafka_test objKafka=new kafka_test();
  // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    int myport = 9999;
    String hostname = "localhost";
 // set up the execution environment
    final StreamExecutionEnvironment env = 
  StreamExecutionEnvironment.getExecutionEnvironment();


 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> stream = env.socketTextStream(hostname,myport);

    stream.addSink(objKafka.createStringProducer("testFlink", 
    "localhost:9092"));

    DataStream<String> text = 
    env.addSource(objKafka.createStringConsumerForTopic("testFlink", 
    "localhost:9092", "test"));
    DataStream<Tuple2<String, Long>> counts = text
     .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                @Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) 
   {
          // normalize and split the line
             String[] words = value.toLowerCase().split("\\W+");

                    // emit the pairs
             for (String word : words) {
                  if (!word.isEmpty()) {
                     out.collect(new Tuple2<String, Long>(word, 1L));
                        }
                    }
                }
            })
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);
          // emit result
        if (params.has("output")) {
           counts.writeAsText(params.get("output"));
          } else {
          System.out.println("Printing result to stdout. Use --output 
          to specify output path.");
          counts.print();
         }
    // execute program
    env.execute("Streaming WordCount");

    }//main
   }

  public class kafka_test {
  public FlinkKafkaConsumer<String> createStringConsumerForTopic(
        String topic, String kafkaAddress, String kafkaGroup) {
  //        ************************** KAFKA Properties ******        
     Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();     

    return myconsumer;
  }

  public FlinkKafkaProducer<String> createStringProducer(
        String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<>(kafkaAddress,
            topic, new SimpleStringSchema());
     }
  }

如有任何帮助,不胜感激。

共有1个答案

公冶翰池
2023-03-14

我认为你的代码是正确的。Kafka将负责数据的“分配”。数据将如何在Kafka代理之间分布将取决于主题配置。

检查这里的答案,以更好地理解Kafka主题和分区。

假设你有3个Kafka经纪人。然后,如果您用3个副本和3个分区创建主题

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "broker1:9092,broker2:9092,broker3:9092",
        "my-topic",
        new SimpleStringSchema());

stream.addSink(myProducer);
 类似资料:
  • 我需要在不同的机器上配置一个Kafka集群,但它不起作用,当我启动生产者和消费者时,将显示以下错误: 你能帮帮我吗。

  • 我有一个nodejs集群服务器,它使用mongo changestream侦听器,通过socket.io向客户端发送数据。 < code>{ userId: 'aaa ',socketId: 'bbb' } 用于存储此数据的redis客户端在主进程中初始化。mongo变更流是在主进程中创建的。 当变更流看到新文档时,它会将该文档作为消息发送到子流程。当子进程收到消息时,它可以从文档中检索 user

  • 我有 2 个 docker 容器运行我的 Web 应用程序和机器学习应用程序,都使用 h2o。最初,我既调用 h2o.init() 又指向同一个 IP:PORT,因此初始化了一个具有一个节点的 h2o 集群。 考虑到我已经训练了一个模型,现在我正在训练第二个模型。在此训练过程中,如果web应用程序调用h2o集群(例如,从第一个模型请求预测),它将终止训练过程(错误消息如下),这是无意的。我尝试为每

  • 我按照以下说明设置了一个多节点kafka集群。现在,如何连接到动物园管理员?在JAVA中,只连接一个来自生产者/消费者端的动物园管理员可以吗?或者有办法连接所有的动物园管理员节点吗? 设置多节点阿帕奇动物园守护者集群 在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中 在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为

  • 我计划部署Kafka集群。我有以下查询: 1)为了保护生产者和消费者与Kafka broker的通信,可以使用SSL。如果我有一个由9个代理和3个zookeeper节点组成的集群,并且如果我不想使用自签名证书,我是否必须为每个节点购买一个证书(9个3证书,成本太高)? 正如我所读到的,生产者/消费者直接联系其中一个经纪人节点,而不联系动物园管理员。 谢谢, 病毒的

  • 我的EKS集群变得不健康,因为所有豆荚都有“容器创建”错误,这可能与CNI问题有关。 在版本1.5.5中,conflist文件的位置更改为/etc/cni/10-aws.conflist,但节点仍然处于“notready”状态。 我的EKS版本是1.14,平台版本是EKS.2。 Ipamd日志: 有人对这个问题有什么线索吗?