9.2 编写自己的流处理程序
在本指南中,我们将从头开始建立属于自己的项目,使用 Kafka Streams 编写一个流处理应用程序。如果你还没阅读过 quickstart (在Kafka流中运行一个流应用程序)章节,我们强烈建议你先去阅读一下。
建立一个Maven项目
使用以下命令来创建具有 Kafka Streams 项目架构的 Maven 原型:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.kafka \ -DarchetypeArtifactId=streams-quickstart-java \ -DarchetypeVersion={{fullDotVersion}} \ -DgroupId=streams.examples \ -DartifactId=streams.examples \ -Dversion=0.1 \ -Dpackage=myapps
对于 groupId
, artifactId
和 package
这三个参数你可以设置不同的值。
假定使用上述的参数值,该命令将创建如下的项目结构:
> tree streams.examples streams-quickstart |-- pom.xml |-- src |-- main |-- java | |-- myapps | |-- LineSplit.java | |-- Pipe.java | |-- WordCount.java |-- resources |-- log4j.properties
项目中包含的 pom.xml
已经定义了 Streams 依赖,并且在 src/main/java
下已经有几个用 Streams 库编写的示例程序。既然我们要从头开始编写这些程序,那我们现在可以删除这些示例:
> cd streams-quickstart > rm src/main/java/myapps/*.java
编写第一个 Streams 应用: Pipe
可以打开你喜欢的 IDE 并导入这个 Maven 项目,或者简单地打开一个文本编辑器,并在 src/main/java
下创建一个java文件。让我们命名为 Pipe.java
:
package myapps; public class Pipe { public static void main(String[] args) throws Exception { } }
我们将在 main
函数中来编写这个 pipe 程序。注意,由于 ide 通常可以自动添加导入语句(import),所以我们不会列出这些 import。但是,如果你使用的是文本编辑器,则需要手动添加导入,在本节的末尾,我们将向你展示带有 import 语句的完整代码片段。
写 Streams 应用程序的第一步是创建一个 java.util.Properties
映射来保存定义在 StreamsConfig
中不同的 Streams 执行配置参数。几个重要的配置值需要设置: StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
,它指定用于和 Kafka 集群建立初始化连接的 host/port 对的列表, StreamsConfig.APPLICATION_ID_CONFIG
,它提供了 Streams 应用程序的唯一标识符,以便于在同一个Kafka集群通信过程中区分自己和其他应用程序:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
另外,你可以在同一个映射中自定义其他配置,例如配置默认的序列化和反序列化库的键值对:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
有关 Kafka Streams 配置的完整列表,请参阅此 table。
接下来我们将定义 Streams 应用程序的计算逻辑。在 Kafka Streams 中,这个计算逻辑被定义为拓扑结构(topology)
(即连接起来的处理器节点( processor nodes )) 。 我们可以使用 topology builder 来构建这样的 topology,
final StreamsBuilder builder = new StreamsBuilder();
然后使用这个 topology builder 从 streams-plaintext-input
这个 Kafka topic 中创建 source stream:
KStream<String, String> source = builder.stream("streams-plaintext-input");
现在我们得到一个 KStream
,它不断地从 streams-plaintext-input
这个 Kafka topic 的数据流源头中生成记录。
记录被组织为 String
类型的 key-value 键值对。
对于这个 stream,最简单处理的方法就是将它写入另一个 Kafka topic,比如这个名为 streams-pipe-output
的 topic :
source.to("streams-pipe-output");
注意,我们也可以将上面的两行代码连接成一行:
builder.stream("streams-plaintext-input").to("streams-pipe-output");
我们可以通过执行以下操作来检查此 builder 创建的 topology
结构类型:
final Topology topology = builder.build();
并将其描述打印为标准输出:
System.out.println(topology.describe());
如果我们只编码到这里,然后编译并运行此程序,它将输出以下信息:
> mvn clean package > mvn exec:java -Dexec.mainClass=myapps.Pipe Sub-topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001 Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000 Global Stores: none
如上所示,搭建好的拓扑结构有两个处理器节点(processor nodes),一个 source node KSTREAM-SOURCE-0000000000
和一个 sink node KSTREAM-SINK-0000000001
。KSTREAM-SOURCE-0000000000
持续地从 Kafka topic streams-plaintext-input
读取数据并将数据传递给下游节点 KSTREAM-SINK-0000000001
;KSTREAM-SINK-0000000001
会将它接收到的数据按顺序写入到其他 Kafka topic streams-pipe-output
(-->
和 <--
两个箭头分别指示该节点的下游和上游处理器节点 ,即拓扑结构中的父节点和子节点)。
另外,这种简单的拓扑没有与之相关联的全局状态存储state-store(我们将在后面的章节中更多地讨论状态存储)。
请注意,如上所述,我们在代码中创建拓扑结构的时候可以在任何给定点上声明拓扑结构,因此作为用户,您可以交互式地“尝试并品尝”拓扑中定义的计算逻辑,直到您满意为止。
假设我们已经完成了这个简单的拓扑结构,它只是以一种无尽的流处理方式将数据从一个Kafka topic 传递到另一个 topic,
我们现在可以使用我们刚刚构建的两个组件构建Streams客户端:配置映射和拓扑对象(也可以从props
映射构造一个StreamsConfig
对象,然后将该对象传递给构造函数,KafkaStreams
具有重载的构造函数以接受任意类型)。
final KafkaStreams streams = new KafkaStreams(topology, props);
通过调用它的start()
函数,我们可以触发这个客户端开始运行。
除非在客户端上调用close()
函数,否则程序不会停止。
例如,我们可以添加一个带有倒计数锁存器的关闭钩子来捕获用户中断,并在终止该程序时关闭客户端:
final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0);
到目前为止完整的代码如下所示:
package myapps; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class Pipe { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
如果你已经有一个正在运行的 Kafka broker (服务地址是:localhost:9092
),
并且 streams-plaintext-input
和 streams-pipe-output
这两个 topic 也都创建好了,
那么就可以在IDE或者命令行上运行程序,使用如下 Maven 命令:
> mvn clean package > mvn exec:java -Dexec.mainClass=myapps.Pipe
有关如何运行Streams应用程序并观察计算结果的详细说明,请阅读运行 demo 程序部分。
本节的其余部分我们不会谈论这一点。
编写第二个流处理程序:Line Split
我们已经学会了如何使用 kafka Streams 的两个关键组件:StreamsConfig
和拓扑结构(Topology)
来构建Streams客户端。
现在让我们继续给当前的拓扑结构添加一些实际的处理逻辑。
我们可以首先复制现有的Pipe.java
类来创建另一个程序:
> cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
并更改其类名以及应用程序ID配置以与原始程序区分开来:
public class LineSplit { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); // ... } }
由于每个 Stream 提供的记录都是一个String
类型的键值对,因此我们将 value 值视为一行文本,并使用FlatMapValues
运算符将其拆分为单词:
KStream<String, String> source = builder.stream("streams-plaintext-input"); KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.split("\\W+")); } });
运算符将source
流作为其输入,按顺序处理源流中的每条记录,并通过将记录的value值分解为一个字符串列表,以此来生成一个名为words
的新的流,
并将每个单词作为输出流words
的新记录。
这是一个无状态的操作符,无需跟踪之前收到的记录或处理结果。
请注意,如果您使用的是JDK 8,则可以使用lambda表达式并简化上面的代码:
KStream<String, String> source = builder.stream("streams-plaintext-input"); KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
最后,我们可以将 word 流写回另一个Kafka topic,比如streams-linesplit-output
。
再次,这两个步骤可以如下所示连接(假设使用lambda表达式):
KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output");
如果我们现在将这个扩展拓扑描述为System.out.println(topology.describe())
,我们将得到以下结果:
> mvn clean package > mvn exec:java -Dexec.mainClass=myapps.LineSplit Sub-topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000 Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001 Global Stores: none
正如我们上面看到的那样,一个新的处理器节点KSTREAM-FLATMAPVALUES-0000000001
被注入到原始 source node 和 sink node 之间的拓扑结构中。
它将 source node 作为其父节点并将 sink node 作为其子节点。
换句话说,从 source node 获取的每个记录将首先传递给新加入的KSTREAM-FLATMAPVALUES-0000000001
节点进行处理,然后生成一个或多个新记录作为输出结果。
这些新记录会持续传递给 sink node 并最终回写到 kafka 。
注意这个处理器节点是“无状态的”,因为它不与任何 stores 相关联(即(stores: [])
)。
完整的代码如下所示(假设使用lambda表达式):
package myapps; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class LineSplit { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // ... same as Pipe.java above } }
编写第三个流处理程序: Wordcount
现在让我们进一步添加一些“有状态”计算过程(计算从源文本流中拆分出来的单词出现的次数)到拓扑结构中。
按照类似的步骤,我们创建另一个基于LineSplit.java
类的程序:
public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); // ... } }
为了计算单词,我们可以首先修改flatMapValues
运算符,将它们全部转为小写字母(假设使用lambda表达式):
source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } });
为了进行计数聚合,我们必须首先指定我们要使用groupBy
运算符将 value 字符串(即小写单词)输入到流中。
该运算符生成一个新的分组流,然后可以通过count
运算符进行聚合,该运算符会在每个分组键上持续计数:
KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }) .groupBy(new KeyValueMapper<String, String, String>() { @Override public String apply(String key, String value) { return value; } }) // Materialize the result into a KeyValueStore named "counts-store". // The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store. .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
请注意,count
运算符具有Materialized
参数,该参数指定运行中的计数应存储在名为counts-store
的 state store 中。
这个Counts store
支持实时查询,在开发者指南中有更详细的描述。
我们还可以将counts
KTable 的 changelog Stream 写回到另一个 Kafka topic 中,比如streams-wordcount-output
。
因为结果是一个 changlog stream ,所以 output topic streams-wordcount-output
需要配置为启用日志压缩。
请注意,这次的值类型不再是String类型
而是Long类型
,所以默认的序列化类已经不适合将long类型的结果写回 Kafka 。
我们需要为Long
类型提供重写之后的序列化方法,否则将抛出运行时异常:
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
请注意,为了从 streams-wordcount-output
这个 topic 读取 changelog stream ,
需要将值反序列化设置为org.apache.kafka.common.serialization.LongDeserializer
。
有关详细信息,请参见运行 demo 程序一节。
假设可以使用来自JDK 8的lambda表达式,上面的代码可以简化为:
KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
如果我们再次将此扩展拓扑描述为System.out.println(topology.describe())
,我们将得到以下结果:
> mvn clean package > mvn exec:java -Dexec.mainClass=myapps.WordCount Sub-topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002 Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003 Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006 Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003 Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007 Global Stores: none
如上所述,拓扑现在包含两个断开的子拓扑。
第一个子拓扑的 sink node KSTREAM-SINK-0000000004
将写入一个重新分区的 topic ——Counts-repartition
,
并将由第二个子拓扑的 source nodeKSTREAM-SOURCE-0000000006
读取。
重分区的 Topic 是通过聚合键(本例中的聚合键为值字符串)“混洗”源流。
此外,在第一个子拓扑结构中,为了取出聚合键为空的中间记录,一个无状态的KSTREAM-FILTER-0000000005
被嵌入到在分组节点KSTREAM-KEY-SELECT-0000000002
节点和 sink node 之间。
在第二个子拓扑中,聚合节点KSTREAM-AGGREGATE-0000000003
与名为Counts
的 state store 结合在一起(state store 的名称由用户在cout
运算符指定)。
从 source node 接收记录时,聚合处理器将首先查询其关联的Counts
store ,以获得该key值当前的计数,增加1,然后将新计数写回到 store。
每个更新的密钥计数都将传送到KTABLE-TOSTREAM-0000000007
节点的下游,该节点将count的更新流解析为记录流,然后再进一步传输到 sink node KSTREAM-SINK-0000000008
以便写回 kafka 系统。
完整的代码如下所示(假设使用lambda表达式):
package myapps; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // ... same as Pipe.java above } }