对于我的一个Kafka streams应用程序,我需要同时使用DSL和处理器API的特性。我的流媒体应用程序流是
source -> selectKey -> filter -> aggregate (on a window) -> sink
聚合之后,我需要向接收器发送单个聚合消息。因此我定义拓扑如下
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor());
public class MyProcessor implements Processor<String, String> {
private ProcessorContext context = null;
Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);
KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build()
.get();
public MyProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.register(invStore, false, null); // register the store
this.context.schedule(10 * 60 * 1000L);
}
@Override
public void process(String partitionKey, String message) {
try {
MessageModel smb = new MessageModel(message);
HashMapStore oldStore = invStore.get(partitionKey);
if (oldStore == null) {
oldStore = new HashMapStore();
}
oldStore.addSmb(smb);
invStore.put(partitionKey, oldStore);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void punctuate(long timestamp) {
// processes all the messages in the state store and sends single aggregate message
}
@Override
public void close() {
invStore.close();
}
}
知道这里出了什么问题吗?
您需要使用StreamsBuilder
(或在较旧版本中使用KStreamBuilder
)在处理器外部注册存储。首先创建存储,然后将其注册到StreamsBuilder
(KStreamBuilder
),添加处理器时提供存储名称以连接处理器和存储。
StreamsBuilder builder = new StreamsBuilder();
// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("invStore"),
Serdes.String(),
invSerde));
// register store
builder.addStateStore(storeBuilder);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
// older API:
KStreamBuilder builder = new KStreamBuilder();
// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build();
// register store
builder.addStateStore(storeSupplier);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
我的工具基本上读取PDF并在JTextArea中打印PDF的内容。在我的PDF包含阿拉伯语的PH Mirjan字体之前,一切正常。我的文本区域显示一些垃圾字符,如下所示。 我该如何解决这个问题? 我的文本区域的默认字体是Arial Unicode MS。我可以配置文本区域的字体吗?假设我在本地下载了PH Mirjan,如何将文本区域字体更改为下载的字体。非常感谢任何建议或参考链接。 编辑 这给了我
我想添加“from”(date)来响应datepicker输入,但到目前为止,我有一些奇怪的行为。 我的代码: 到目前为止,我试过: 和 但这两种解决方案都将我的日期从“2020年9月14日16:43”转变为“从周一9月14 2020 17:19:38格林尼治标准时间0400(留尼汪岛)”,这是不必要的行为,因为我绝对想要法语日期。任何想法?谢啦
在RestTemplate中,我有一个自定义拦截器,它将记录一些请求-响应详细信息并保存到数据库。 我的自定义拦截器: springboot中的RestTemboard bean配置: 将拦截器添加到restTemboard bean: 如何将此拦截器添加到佯装客户端? 正在应用中。yml: InterceptorOne为假装客户端中的每个请求添加标头: 但是我不能添加日志服务拦截器,因为它由于错
问题内容: 我正在研究Spring Data JPA。考虑下面的示例,默认情况下我将使所有crud和finder功能正常工作,如果我想自定义finder,那么也可以在界面本身中轻松完成。 我想知道如何为上述AccountRepository的实现添加完整的自定义方法?由于它是一个接口,所以我不能在那里实现该方法。 问题答案: 你需要为自定义方法创建一个单独的接口: 并提供该接口的实现类:
在中有一个方法,但它看起来不像是一个公共API,所以我宁愿不使用它。创建自定义指令并使用看起来是另一种选择,但基本上需要为每个自定义验证规则创建一个指令,而我不希望这样做。 实际上,在最简单的场景中,将控制器中的某个字段标记为无效(同时保持同步)可能是我完成任务所需要的,但我不知道如何做到这一点。
搜索很好,它在工作,我正在过滤的交易类型(出售或出租)和房间数量在每个房地产。 但是我的JSON响应缺少很多字段,包括ACF。例如:{ “id”:149,“post_author”:“2”,“post_date”:“2016-03-03 23:53:39”,“post_date_gmt”:“2016-03-03 23:53:39”,“post_content”:“”post_title“:”opo