Apache Flink 零基础入门(二十)Flink kafka connector

周博达
2023-12-01

内置source和sink

内置source包括从文件读取,从文件夹读取,从socket中读取、从集合或者迭代器中读取。内置的sink包括写文件、控制台输出、socket

内置connectors

 HDFS Connector

这个connector提供了一个sink,可以写分区到任何一个文件系统(只要支持hadoop filesystem就可以)。

 Kafka Connector

添加依赖

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>

下载安装zookeeper

  1. 将zookeeper-3.4.5下面的conf文件夹下的zoo_sample.cfg复制一份zoo.cfg,修改dataDir=/home/vincent/tmp
  2. 运行zookeeper:./zkServer.sh start 
  3. 输入jps可以看到QuorumPeerMain进程说明zk成功启动了。

下载安装kafka

  • 同样修改kafka_2.11-2.0.1/config下面的server.properties文件,修改:

           log.dirs=/home/vincent/tmp/kafka-logs

           zookeeper.connect=localhost:2181

  • 运行Kafka:
./bin/kafka-server-start.sh ./config/server.properties
  • 输入jps可以看到Kafka进程说明kafka成功启动了。
  • 创建一个topic
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest 
  • 创建成功,查看所有topic
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
mytest
  • 启动生产者
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
  • 启动消费者
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest

Flink连接Kafka

source 从Kafka中读取数据

Scala:

object KafkaConnectorConsumerApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.227.128:9092")
    properties.setProperty("group.id", "test")
    env
      .addSource(new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), properties))
      .print()
    
    env.execute("KafkaConnectorConsumerApp")
  }

Java:

public class JavaKafkaConsumerApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.227.128:9092");
        properties.setProperty("group.id", "test");
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer<>("mytest", new SimpleStringSchema(), properties));
        stream.print();

        env.execute("JavaKafkaConsumerApp");

    }
}

sink 将数据输出到Kafka中

Scala:

object KafkaConnectorProducerApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从socket接受数据,通过Flink,将数据Sink到kafka
    val data=env.socketTextStream("192.168.227.128", 9999)
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.227.128:9092")
    properties.setProperty("group.id", "test")
    val kafkaSink = new FlinkKafkaProducer[String]("mytest", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), properties)
    data.addSink(kafkaSink)
    env.execute("KafkaConnectorProducerApp")

  }
}

Java:

public class JavaKafkaProducerApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("192.168.227.128", 9999);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.227.128:9092");
        properties.setProperty("group.id", "test");
        data.addSink(new FlinkKafkaProducer<String>("192.168.227.128:9092", "mytest", new SimpleStringSchema()));
        env.execute("JavaKafkaProducerApp");
    }
}

默认的flink kafka消费策略是setStartFromGroupOffsets(default behaviour),会自动从上一次未消费的数据开始

 类似资料: