我要用Kafka流计算平均值。所以我做了一个有状态的操作,聚合,需要创建一个状态存储,但是这种情况不会发生。
这里是平均值的函数:
private void average () {
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(this.topicSrc);
KStream <String, Double> average = source
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(this.variable, value.getNumberValue(this.pathVariable, this.variable)))
.groupByKey( Serialized.with(
Serdes.String(),
Serdes.String()))
.aggregate (
() -> new Tuple(0, 0),
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
Materialized.with(Serdes.String(), new MySerde()))
.mapValues(v -> v.getAverage())
.toStream();
average.to(this.topicDest, Produced.with(Serdes.String(), Serdes.Double()));
KafkaStreams stream = new KafkaStreams(builder.build(), props);
stream.start();
}
以下是例外情况:
Exception in thread "Thread-0" org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.streams.errors.ProcessorStateException: base state directory [/tmp/kafka-streams] doesn't exist and couldn't be created
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:658)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:628)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:538)
at it.imolinfo.sacmi.processor.Streamer.average(Streamer.java:167)
at it.imolinfo.sacmi.processor.Streamer.run(Streamer.java:180)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: base state directory [/tmp/kafka-streams] doesn't exist and couldn't be created
at org.apache.kafka.streams.processor.internals.StateDirectory.<init>(StateDirectory.java:80)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:656)
... 5 more
问题是基本目录不存在,但我希望kafka流在必要时创建目录。
编辑 ----- 我注意到,如果我有1个处理器,使一个变量的平均值没有问题,但如果我有2个处理器是。
1个处理器的配置文件:
type->streamer
number->1
subtype->average
variabli->payload:T_DUR_CICLO
topicSrc->m0-tempi
topicDest->average
application.id->stream0
bootstrap.servers->localhost:9092
schema.registry.url->http://localhost:8081
default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
2个处理器的配置文件:
type->streamer
number->1
subtype->average
variabli->payload:T_DUR_CICLO
topicSrc->m0-tempi
topicDest->average
application.id->stream0
bootstrap.servers->localhost:9092
schema.registry.url->http://localhost:8081
default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
type->streamer
number->1
subtype->average
variabli->payload:HMI_TEMP_E1
topicSrc->m0-temperature
topicDest->average
application.id->stream1
bootstrap.servers->localhost:9092
schema.registry.url->http://localhost:8081
default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
现在我启动处理器:
private void loadStreamer (Tuple t){
int number = Integer.parseInt(t.getNumber());
for (int i = 0; i < number; i ++) {
String[] splitted = t.getVariables()[0].split(":");
Streamer s = new Streamer (t.getSubType(), t.getTopicSrc(), t.getTopicDest(), splitted[0], splitted[1], t.getProp());
Thread th = new Thread (s);
th.start();
}
}
类型元组包含配置文件的信息。for cules中的数字是配置文件中存在的数字。在这种情况下是1,但我可以为犯规公差做更多相同过程的实例。
流光:
public class Streamer implements Runnable{
private final String topicSrc;
private final String topicDest;
private final String variable;
private final String pathVariable;
private final String type;
private final Properties props;
public Streamer (String type, String topicSrc, String topicDest, String pathVariable, String variable, Properties props) {
this.type = type;
this.topicSrc = topicSrc;
this.topicDest = topicDest;
this.variable = variable;
this.pathVariable = pathVariable;
this.props = props;
}
private void average () {
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(this.topicSrc);
KStream <String, Double> average = source
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(this.variable, value.getNumberValue(this.pathVariable, this.variable) + ":" + value.getStringValue("timestamp")))
.groupByKey( Serialized.with(
Serdes.String(),
Serdes.String()))
.aggregate (
() -> new Tuple(0, 0, ""),
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue.split(":")[0]), newValue.split(":")[1]),
Materialized.with(Serdes.String(), new MySerde()))
.mapValues((key, value) -> new AverageRecord (key, value.getDate(), value.getAverage()))
.mapValues(v -> v.getAverage())
.toStream();
average.to(this.topicDest, Produced.with(Serdes.String(), Serdes.Double()));
KafkaStreams stream = new KafkaStreams(builder.build(), props);
stream.start();
}
public void run() {
switch (this.type) {
case "average":
average();
break;
case "filter":
filter();
break;
default:
System.out.println("type not valid " + this.type);
break;
}
所以我有两个线程和两个拖缆对象,执行平均值函数。与2拖缆唯一不同的是用于计算平均值的变量。
我以错误的方式创建流程?
您只需执行新文件(“/tmp/kafka-streams”)。mkdirs()
启动流之前。Kafka车队起跑手有比赛条件。
添加不同的状态。dir
每个流的配置,而不是默认流。差不多
# stream1
...
state.dir=/tmp/stream1/kafka-stream
# stream2
...
state.dir=/tmp/stream2/kafka-stream
这似乎是权限问题。如果Kafka流应用程序具有在给定路径中写入的权限,它将创建state dir。
/tmp
目录应具有运行应用程序的用户的写入权限。
我需要从配置文件动态创建kafka流,其中包含每个流的源主题名称和配置。应用程序需要有几十个Kafka流和流将是不同的每个环境(例如阶段,prod)。它可能做到这一点与库? 我们可以通过轻松做到这一点: 我们需要实现spring接口,这样所有流都将自动启动和关闭。 是否可以使用做同样的事情?正如我所看到的,我们需要在代码中创建每个Kafka流,我看不到如何使用创建Kafka流列表的可能性。 但是如
我有一个非常简单的Java/Spring应用程序来演示KStream的功能,但不幸的是,我无法使KStream加载数据。想法是创建一个KStream对象,并使用controller GET方法简单地检索其内容。示例代码: 问题-主题中有消息,但foreach(...)中的KStream枚举没有从中检索任何结果。KStream对象状态为“RUNning”,日志中没有错误。 生成随机应用程序ID并将A
我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动Kaf
我有自己的Spring Cloud数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.Spring.io/docs/recipes/polyglot/processor/。然后我想缩放并创建其中的三个处理器,因此使用创建了3个Python内部的POD。我稍微修改了示例中的一段代码:当我创建一个Kafka消费者时,我也会传递一个组id,因此消息应该是负载平衡的
因此,我想知道做这件事的步骤。 我的理想是由kafka Connect创建与表相对应的主题,然后再由我声明(使用KSQL)创建视图。 虽然我在这里描述的一开始听起来是可行的,但我对数据有一个问题主题中数据的结构(模式)。问题似乎是,我可能必须做一个额外的步骤,但不知道它是否可以避免或实际上是必要的。
这是我第一次使用Kafka。我遵循了本教程。启动Zookeper后,我启动了kafka服务器。接下来创建了一个主题,然后启动了该主题的消费者。这是当Zookeper日志说 导致会话0x0关闭的异常:null 我正在使用Windows 10。 kafka_2.11-2.1.0 zookeeper-3.4.12