我正在研究一个解决方案,我必须将Kafka两个主题t1和t2的数据结合起来
t1将包含消息的前半部分,t2将包含消息的后半部分
例如,如果完整消息是“a1b1c1d1”和“a2b2c2d2”,那么
t1将有“a1b1”和“a2b2”
t2将有“c1d1”和“c2d2”
并且我必须对它们执行并集以生成“a1b1c1d1”和“a2b2c2d2”
,因为消息不会按顺序存储在KStream store中,并在它们变为有空<现在的问题是,这是一个好的解决方案吗?如果是,有一个例子。
@Component
public class MyViews {
@Autowired
public void buildProductView(StreamsBuilder sb) {
sb.stream("products", Consumed.with(Serdes.Integer(), Serdes.serdeFrom(Product.class)))
.map((k, v) -> new KeyValue<>(v.getId(), v))
.toTable(Materialized.<Integer, Product, KeyValueStore<Bytes, byte[]>>as("products-view")
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.serdeFrom(Product.class)));
}
@Autowired
public void buildPriceView(StreamsBuilder sb) {
sb.stream("prices", Consumed.with(Serdes.Integer(), Serdes.serdeFrom(Price.class)))
.map((k, v) -> new KeyValue<>(v.getId(), v))
.toTable(Materialized.<Integer, Price, KeyValueStore<Bytes, byte[]>>as("prices-view")
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.serdeFrom(Price.class)));
}
@Data
private class Product {
Integer id;
String item;
}
@Data
private class Price {
Integer id;
Integer price;
}
@Data
private class Order{
Integer id;
String item;
Integer price;
}
}
这就是我想要实现的
每当html" target="_blank">消息发布到“订单”主题时,我想检查“价格视图”中是否存在相应的ID,如果存在,那么我将组合它们生成“订单”对象。
类似地,当消息发布到“价格”主题时,我想检查“产品视图”
@Component
public class MyViews {
@Autowired
public void buildProductView(StreamsBuilder sb) {
final Serde<Product> productSerde = Serdes.serdeFrom(new JsonSerializer<>(), new ProductDeserializer());
final Serde<Price> valueSerde = Serdes.serdeFrom(new JsonSerializer<>(), new PriceDeserializer());
final KTable<Integer, Product> products = sb.stream("products", Consumed.with(Serdes.Integer(), productSerde)).selectKey((k, v) -> v.getId())
.toTable(Materialized.<Integer, Product, KeyValueStore<Bytes, byte[]>>as("products-view")
.withKeySerde(Serdes.Integer())
.withValueSerde(productSerde)
.withCachingDisabled());
final KTable<Integer, Price> prices = sb.stream("prices", Consumed.with(Serdes.Integer(), valueSerde))
.map((k, v) -> new KeyValue<>(v.getId(), v))
.toTable(Materialized.<Integer, Price, KeyValueStore<Bytes, byte[]>>as("prices-view")
.withKeySerde(Serdes.Integer())
.withValueSerde(valueSerde)
.withCachingDisabled());
final KTable<Integer, Order> join = products.join(prices, Product::getId, (l, r) ->
new Order(l.getId(), l.getItem(), r.getPrice())
);
join.toStream().foreach((x, y) -> System.out.println(y));
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Product {
Integer id;
String item;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Price {
Integer id;
Integer price;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Order {
Integer id;
String item;
Integer price;
}
}
我有两个jms消费者,每个都在不同的流中。我想使用另一个流来聚合这两条消息的消息。并且还需要保留相关ID,因为我需要分割有效负载并发送回消息。 我尝试将分散收集与两个入站 VM 一起使用,但收到以下错误: 由以下原因导致:组织.xml.sax.SAXParse 异常:cvc-complex-type.2.4.a:发现以元素“vm:入站终结点”开头的无效内容。“{”http://www.muleso
我试图使用来自另一组主机[etcd]的事实来配置一组主机[节点]。这是我的主机文件 请注意,组[etcd]不是供应的目标,[nodes]是。但是提供[节点]需要了解[etcd]的事实。 当我运行这个playbook时,我得到控制台输出 使etcd事实对节点播放可用的惯用Ansible方法是什么?
当我在SBT之上运行时,我会得到一些异常/错误:
问题内容: 我试图在尝试对6参数函数进行任何复杂操作之前,遍历6参数函数的参数空间,以研究其数值行为,因此,我正在寻找一种有效的方法来执行此操作。 我的函数将6-dim numpy数组中给出的浮点值作为输入。我最初尝试做的是: 首先,我创建了一个函数,该函数接受2个数组并生成一个包含两个数组中值的所有组合的数组: 然后,我曾经将其应用于同一数组的m个副本: 最后,我这样评估我的功能: 这工作,但它
问题内容: 我有两个像这样的数组: 我想结合这两个数组,使其不包含重复项,并保留其原始键。例如,输出应为: 我已经尝试过了,但是它正在更改其原始键: 有什么办法吗? 问题答案: 只需使用: 那应该解决。因为如果一个键出现多次(例如在您的示例中),则使用字符串键,因此一个键将覆盖具有相同名称的处理键。因为在您的情况下,它们两者都具有相同的值,但这无关紧要,并且还会删除重复项。 更新:我刚刚意识到,P
本文向大家介绍python使用append合并两个数组的方法,包括了python使用append合并两个数组的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python使用append合并两个数组的方法。分享给大家供大家参考。具体如下: 希望本文所述对大家的Python程序设计有所帮助。