我们正在构建一个应用程序来从传感器获取数据。数据流传输到Kafka,消费者将从Kafka发布到不同的数据商店。每个数据点都有多个表示传感器状态的属性。
在其中一个消费者中,我们希望仅当值发生更改时才将数据发布到数据存储。例如,如果有温度传感器,每10秒轮询一次数据,我们希望收到如下数据:
----------------------------------------------------------------------
Key Value
----------------------------------------------------------------------
Sensor1 {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:50", temperature: 11}
在上述情况下,只应发布第一条记录和第三条记录。
为此,我们需要某种方法来比较键的当前值与具有相同键的先前值。我认为这在KTable或KStream中应该是可能的,但找不到示例。
任何帮助都会很棒!
如果要对Kafka流执行此操作,必须使用处理器API。
您需要使用状态存储实现自定义变压器。对于每条消息,您应该在状态存储中搜索值,如果它已更改或不存在,您应该返回新值,否则为null。除此之外,您还应该将该值保存在状态存储中(KeyValueStore::put(…)
)
可以在此处找到有关处理器API的更多信息
您可以使用Kafka流处理器API。可以将本地键值存储设置为状态上下文。为获取的每个记录调用process函数。
在process function中,您可以对照存储的最后一个值进行检查,并根据业务逻辑(在您的情况下,比较温度值)接受或拒绝最新记录。
在标点功能中,您可以按时间表将记录转发给消费者。请参阅下面的示例代码(不带标点符号)
java prettyprint-override">public class SensorProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, String> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// retrieve the key-value store named "SensorData"
kvStore = (KeyValueStore) context.getStateStore("SensorData");
// schedule a punctuate() method every second based on event-time
}
@Override
public void process(String sensorName, String sensorData) {
String oldValue = this.kvStore.get(sensorName);
if (oldValue == null) {
this.kvStore.put(sensorName, sensorData);
} else {
//Put the business logic for comparison
//compare temperatures
//if required put the value
this.kvStore.put(sensorName, sensorData);
//Forward it o consumer
context.forward(sensorName, sensorData);
}
context.commit();
}
@Override
public void close() {
// nothing to do
}
}
下面是一个如何使用KStream#transformValues()解决此问题的示例。
java prettyprint-override">StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.String(),
YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
.transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
private KeyValueStore<String, YourValueType> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}
@Override
public YourValueType transform(final String key, final YourValueType value) {
YourValueType prevValue = state.get(key);
if (prevValue != null) {
if (prevValue.temperature() != value.temperature()) {
return prevValue;
}
} else {
state.put(key, value);
}
return null;
}
@Override
public void close() {}
}, stateStorName))
.to(OUTPUT_TOPIC);
将该记录与状态存储中存储的上一个记录进行比较。如果温度不同,则从状态存储返回记录,并将当前记录存储在状态存储中。如果温度相等,则丢弃当前记录。
主要内容:1.架构,2.消息存储模型,3.消息消费模型,4. 多租户,5.运维1.架构 1.1 Kafka brocker和zk组成 1.2 Pulsar Pulsar Broker会在本地缓存消息,并且支持TTL, Pulsar 通过分层架构, 将计算和存储分离, 存储采用BookKeeper集群, 计算使用Broker集群, Brocker 需要内置BookKeeper 客户端 Pulsar的部署和架构更加复杂,但是也更具有伸缩性。 2.消息存储模型 2.1 Kafka
假设我有一个双人课 我希望对它进行排序,首先是第一个值,然后是第二个值。现在,如果我这样做 一切都很好,列表按对的第一个值排序,但如果我这样做 它因错误而失败 好吧,所以它可能无法推断参数,所以如果我这样做 它因错误而失败 为什么它适用于comparing()而不适用于comparing()。然后比较()?
本文向大家介绍比较RabbitMQ与Apache Kafka相关面试题,主要包含被问及比较RabbitMQ与Apache Kafka时的应答技巧和注意事项,需要的朋友参考一下 答:Apache Kafka的另一个选择是RabbitMQ。那么,让我们比较两者: 功能 Apache Kafka– Kafka是分布式的、持久的和高度可用的,这里共享和复制数据 RabbitMQ中没有此类功能 性能速度 A
问题内容: 我试图用Java编写一个程序,该程序涉及使对象 从一次按键操作中不断移动。想一想吃豆子,在其中按下一次, 吃豆子继续上升直到您按下另一个键。 如果可能的话,我想保持代码简单。我的原始动作(一键按下=一动作)是这样的 : 值中的x和y是椭圆的位置。这完美地工作了,但是我 希望它只按一次 就可以继续运动,而不必按住它来保持运动。我尝试了一个带有布尔 参数的while循环,该布尔参数在tru
我有3个< code > edittext(et1,et2,et3),彼此相邻排列,其中< code>maxLength为“1”。 我的要求是: > < li> 当在编辑文本中键入字母时(只能包含1个字符,因为最大长度为1),光标会自动转到下一个编辑文本。 类似地,当用户单击et2上的back时,光标应该转到et1。 编辑文本将按顺序填写。首先是第一个,然后是第二个,最后是第三个。因此,即使用户单
我有一个关于compareTo函数如何帮助比较器排序的问题,即o1。比较(o2)与o2。比较(o1) 如果两个字符串相等,则此方法返回0,否则返回正值或负值。如果第一个字符串在词典上大于第二个字符串,则结果为正,否则结果为负。 上面的陈述很简单,但是为什么o1.compare(o2)会给我一个升序,而o2.compare(o1)给了我一个降序? 如果我有整数值“5,10,3”,我得到3,5,10和