当前位置: 首页 > 知识库问答 >
问题:

如何打印Kafka主题数据在Flink程序?

宦飞
2023-03-14

我通过以下说明创建了一个主题:

C:\kafka_2.12-0.10.2.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test < C:\User11\Desktop\Data.csv

然后,我测试了这个主题是否有正确的数据。之后,我想在Flink程序中打印这个主题。我的计划是:

 try{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),properties));

           stream.print();
    env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

但是我得到了这个信息(因为信息太长了,我不得不写一些):

[main]INFOorg.apache.flink.streaming.api.environment.LocalStream环境-在本地嵌入式Flink迷你集群上运行作业[main]INFOorg.apache.flink.runtime.minicluster.MiniCluster-启动Flink迷你集群[main]INFOorg.apache.flink.runtime.minicluster.MiniCluster-启动指标注册表[main]INFO<--plhd--3[main]INFOorg.apache.flink.runtime.minicluster.MiniCluster-启动RPC服务[flink-akka.actor.default-调度器-2]INFOakka.event.slf4j。Slf4jLogger-Slf4jLogger启动[主]INFOorg.apache.flink.runtime.minicluster.MiniCluster-启动高可用性服务[主]INFOorg.apache.flink.runtime.blob.BobServer-创建BLOB服务器存储目录C:\用户\用户11\AppData\本地\Temp\bobStore-a02ff126-35cc-4c1b-b300-8689d19ff5d2[主]INFO<--plhd-在0.0.0.0:57907启动BLOB服务器-最大并发请求:50-最大积压:1000

此外,我也看到了这个链接,它并没有解决我的问题:如何从flink访问/读取Kafka主题数据?

你能告诉我这里有什么问题吗?

非常感谢。

共有1个答案

微生俊捷
2023-03-14

问题解决了。首先,我用以下命令填充了Kafka主题:

/home/kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list 10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092 --topic flinkTopic < transactions2.csv

然后,使用此代码,我可以打印Kafka主题:

 final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 Properties prop = new Properties();
 prop.setProperty("bootstrap.servers", 
 "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
 prop.setProperty("group.id", "test");
    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 
  ("flinkTopic", new SimpleStringSchema(),prop);
    myConsumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(myConsumer);
    stream.print();
    env.execute("Flink Streaming Java API Skeleton");

我希望这对其他人有用。

 类似资料:
  • 我有一个表示为的自定义状态计算,当我的看到来自Kafka的新事件时,它将不断更新。现在,每次更新状态时,我都希望将更新后的状态打印到stdout。想知道怎么在Flink中做到这一点吗?与所有的窗口和触发器操作很少混淆,我一直得到以下错误。 我只想知道如何将我的聚合流打印到stdout或写回另一个kafka主题? 下面是引发错误的代码片段。

  • 在Flink中有没有任何方法可以自动推断出Kafka主题DDL,而不需要手动查询,就像Spark中的情况一样。

  • 我正在尝试每 提供了Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,必须将Flink的版本从< code>1.13.5更改为< code>1.12.2。 我最初使用的是< code > Flink < code > 1 . 13 . 5 、< code >连接器和< code>2.11的< code>Scala。我到底错过了什么?

  • 如何确保我总是从Kafka主题的一开始就与Flink一起消费? Kafka0.9。x consumer是Flink 1.0.2的一部分,它似乎不再是Kafka,而是Flink来控制偏移量: Flink在内部快照偏移量,作为其分布式检查点的一部分。Kafka/动物园管理员promise的补偿只是为了让外界对进展的看法与Flink对进展的看法保持同步。通过这种方式,监控和其他工作可以了解Flink K

  • 我想打印Flink已开始读取的Kafka主题的每个分区的起始偏移量?

  • 目前,我有一个Flink集群,它想通过一个模式消费Kafka主题,通过使用这种方式,我们不需要维护一个硬代码Kafka主题列表。 --update--我需要知道主题信息的原因是我们需要这个主题名称作为参数,在即将到来的Flink sink部分中使用。