内置source包括从文件读取,从文件夹读取,从socket中读取、从集合或者迭代器中读取。内置的sink包括写文件、控制台输出、socket
这个connector提供了一个sink,可以写分区到任何一个文件系统(只要支持hadoop filesystem就可以)。
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
log.dirs=/home/vincent/tmp/kafka-logs
zookeeper.connect=localhost:2181
./bin/kafka-server-start.sh ./config/server.properties
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
mytest
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest
Scala:
object KafkaConnectorConsumerApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.227.128:9092")
properties.setProperty("group.id", "test")
env
.addSource(new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), properties))
.print()
env.execute("KafkaConnectorConsumerApp")
}
Java:
public class JavaKafkaConsumerApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.227.128:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("mytest", new SimpleStringSchema(), properties));
stream.print();
env.execute("JavaKafkaConsumerApp");
}
}
Scala:
object KafkaConnectorProducerApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从socket接受数据,通过Flink,将数据Sink到kafka
val data=env.socketTextStream("192.168.227.128", 9999)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.227.128:9092")
properties.setProperty("group.id", "test")
val kafkaSink = new FlinkKafkaProducer[String]("mytest", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), properties)
data.addSink(kafkaSink)
env.execute("KafkaConnectorProducerApp")
}
}
Java:
public class JavaKafkaProducerApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = env.socketTextStream("192.168.227.128", 9999);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.227.128:9092");
properties.setProperty("group.id", "test");
data.addSink(new FlinkKafkaProducer<String>("192.168.227.128:9092", "mytest", new SimpleStringSchema()));
env.execute("JavaKafkaProducerApp");
}
}
默认的flink kafka消费策略是setStartFromGroupOffsets(default behaviour),会自动从上一次未消费的数据开始