我是Quarkus的新手。我正在尝试使用quarkus reactive编写一个RESTendpoint,它接收输入,进行一些验证,将输入转换为列表,然后将消息写入kafka。我的理解是,将所有内容转换为Uni/Multi,将导致以异步方式在I/O线程上执行。在intelliJ日志中,我可以看到代码在executor线程中以顺序方式执行。Kafka写入在其自己的网络线程中按顺序进行,这会增加延迟。
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
ObjectMapper mapper = new ObjectMapper();
//deflateMessage() converts input to a list of inputSample
Multi<InputSample> keys = Multi.createFrom().item(inputSample)
.onItem().transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
.concatenate();
return keys.onItem().transformToUniAndMerge(payload -> {
try {
return producer.writeToKafka(payload, mapper);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
}
@Inject
@Channel("write")
Emitter<String> emitter;
Uni<OutputSample> writeToKafka(InputSample kafkaPayload, ObjectMapper mapper) throws JsonProcessingException {
String inputSampleJson = mapper.writeValueAsString(kafkaPayload);
return Uni.createFrom().completionStage(emitter.send(inputSampleJson))
.onItem().transform(ignored -> new OutputSample("id", 200, "OK"))
.onFailure().recoverWithItem(new OutputSample("id", 500, "INTERNAL_SERVER_ERROR"));
}
我已经用了几天了。不确定是否做错了什么。任何帮助都将不胜感激。谢谢
Multi用于源连续发射项目,直到发射完成事件,而这不是您的情况。
来自叛变文件:
Multi表示数据流。流可以发出0、1、n或无限多个项。
您很少自己创建多实例,而是使用公开Mutiny API的反应式客户端。
你要找的是一个Uni
所以,你需要的是将每条信息发送到Kafka,而不是立即等待它们的返回,而是收集生成的Uni,然后将其收集到单个Uni。
@POST
public Uni<List<OutputSample>> send(InputSample inputSample) {
// This could be injected directly inside your producer
ObjectMapper mapper = new ObjectMapper();
// Send each item to Kafka and collect resulting Unis
List<Uni<OutputSample>> uniList = deflateMessage(inputSample).stream()
.map(input -> producer.writeToKafka(input, mapper))
.collect(Collectors.toList());
// Transform a list of Unis to a single Uni of a list
@SuppressWarnings("unchecked") // Mutiny API fault...
Uni<List<OutputSample>> result = Uni.combine().all().unis(uniList)
.combinedWith(list -> (List<OutputSample>) list);
return result;
}
与任何其他反应式库一样,Mutiny主要围绕数据流控制进行设计。
也就是说,它的核心将提供一组控制流执行和调度的功能(通常通过一些操作员)。这意味着,除非您指示社区对象异步,否则它们将以顺序(旧)方式执行。
执行调度使用两个运算符进行控制:
runSubscriptionOn
:这将导致生成项的代码段(通常指上游)在指定的执行器的线程上执行
emitOn
:这将导致订阅代码(通常指下游)在指定的执行器的线程上执行
然后,您可以按如下方式更新您的代码,使通货紧缩变得异步:
Multi<InputSample> keys = Multi.createFrom()
.item(inputSample)
.onItem()
.transformToMulti(array -> Multi.createFrom()
.iterable(deflateMessage.deflateMessage(array)))
.runSubscriptionOn(Infrastructure.getDefaultExecutor()) // items will be transformed on a separate thread
.concatenate();
为了在单独的线程上完成对Kafka队列的完整下游、转换和写入,可以使用
emitOn
操作符,如下所示:
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
ObjectMapper mapper = new ObjectMapper();
return Uni.createFrom()
.item(inputSample)
.onItem()
.transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
.emitOn(Executors.newFixedThreadPool(5)) // items will be emitted on a separate thread after transformation
.onItem()
.transformToUniAndConcatenate(payload -> {
try {
return producer.writeToKafka(payload, mapper);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return Uni.createFrom().<OutputSample>nothing();
});
}
我一直在考虑使用Apache Kafka作为事件源配置中的事件存储。发布的事件将与特定的资源相关联,传递到与资源类型相关联的主题,并按资源ID分片到分区中。因此,例如,创建类型为Folder和id 1的资源将产生一个FolderCreate事件,该事件将通过在主题中的分区总数中对id 1进行分片来传递到给定分区中的“Folders”主题。即使我不知道如何处理使日志不一致的并发事件。 最简单的场景是
我们正在构建一个应用程序来从传感器获取数据。数据流传输到Kafka,消费者将从Kafka发布到不同的数据商店。每个数据点都有多个表示传感器状态的属性。 在其中一个消费者中,我们希望仅当值发生更改时才将数据发布到数据存储。例如,如果有温度传感器,每10秒轮询一次数据,我们希望收到如下数据: 在上述情况下,只应发布第一条记录和第三条记录。 为此,我们需要某种方法来比较键的当前值与具有相同键的先前值。我
我试着用蒙特卡罗模拟方法来计算一个事件发生的概率,在一个连续三次的10个事件列表中。我将进行100万次试验。事件发生的概率在任何时候都是31.43%。我的想法是,我将调用任何试验(b),并创建一个嵌套循环,因此如果条件一(rand值小于.3143),我将移动到索引中的下一个数字,如果该数字小于.3143,我将移动到下一个数字。如果发生这种情况,我会给火鸡加1。当100万次试验完成后,我将turke
我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。 在源数据库上,sql数据类型为。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。
我在分布式模式下运行Kafka连接,有两个不同的连接器,每个连接器都有一个任务。每个连接器都在不同的实例中运行,这正是我想要的。 Kafka connect集群是否总是确保相同的行为来适当地分担负载?
问题内容: 假设我有一个实例: 然后,我使用ExecutorService提交上述任务: 现在,我可以通过取消任务。我了解的是将会中断正在运行此任务的工作线程,例如。但这 仅设置一个标志 来告知工作线程已中断。 我的问题是:如果Runnable已开始运行,实际上如何在继续执行其余代码时停止我的代码?是否在下面定期检查工作线程的中断标志?我的意思是我不明白如何仅通过将中断标志设置为true来取消ru