当前位置: 首页 > 知识库问答 >
问题:

叛变——Kafka写下了连续发生的事情

宗政小林
2023-03-14

我是Quarkus的新手。我正在尝试使用quarkus reactive编写一个RESTendpoint,它接收输入,进行一些验证,将输入转换为列表,然后将消息写入kafka。我的理解是,将所有内容转换为Uni/Multi,将导致以异步方式在I/O线程上执行。在intelliJ日志中,我可以看到代码在executor线程中以顺序方式执行。Kafka写入在其自己的网络线程中按顺序进行,这会增加延迟。

@POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<OutputSample> send(InputSample inputSample) {
        ObjectMapper mapper = new ObjectMapper();

       //deflateMessage() converts input to a list of inputSample
        Multi<InputSample> keys = Multi.createFrom().item(inputSample)
                .onItem().transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
                .concatenate();

     
        return keys.onItem().transformToUniAndMerge(payload -> {
            try {
                return producer.writeToKafka(payload, mapper);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return null;
        });
    } 
@Inject
@Channel("write")
    Emitter<String> emitter;

Uni<OutputSample> writeToKafka(InputSample kafkaPayload, ObjectMapper mapper) throws JsonProcessingException {
        
        String inputSampleJson = mapper.writeValueAsString(kafkaPayload);
        return Uni.createFrom().completionStage(emitter.send(inputSampleJson))
                .onItem().transform(ignored -> new OutputSample("id", 200, "OK"))
                .onFailure().recoverWithItem(new OutputSample("id", 500, "INTERNAL_SERVER_ERROR"));
    }

我已经用了几天了。不确定是否做错了什么。任何帮助都将不胜感激。谢谢

共有2个答案

孔扬
2023-03-14

Multi用于源连续发射项目,直到发射完成事件,而这不是您的情况。

来自叛变文件:

Multi表示数据流。流可以发出0、1、n或无限多个项。

您很少自己创建多实例,而是使用公开Mutiny API的反应式客户端。

你要找的是一个Uni

所以,你需要的是将每条信息发送到Kafka,而不是立即等待它们的返回,而是收集生成的Uni,然后将其收集到单个Uni。

@POST
public Uni<List<OutputSample>> send(InputSample inputSample) {
    // This could be injected directly inside your producer
    ObjectMapper mapper = new ObjectMapper();

    // Send each item to Kafka and collect resulting Unis
    List<Uni<OutputSample>> uniList = deflateMessage(inputSample).stream()
            .map(input -> producer.writeToKafka(input, mapper))
            .collect(Collectors.toList());

    // Transform a list of Unis to a single Uni of a list
    @SuppressWarnings("unchecked") // Mutiny API fault...
    Uni<List<OutputSample>> result = Uni.combine().all().unis(uniList)
            .combinedWith(list -> (List<OutputSample>) list);

    return result;
}

沈旻
2023-03-14

与任何其他反应式库一样,Mutiny主要围绕数据流控制进行设计。

也就是说,它的核心将提供一组控制流执行和调度的功能(通常通过一些操作员)。这意味着,除非您指示社区对象异步,否则它们将以顺序(旧)方式执行。

执行调度使用两个运算符进行控制:

  • runSubscriptionOn:这将导致生成项的代码段(通常指上游)在指定的执行器的线程上执行
  • emitOn:这将导致订阅代码(通常指下游)在指定的执行器的线程上执行

然后,您可以按如下方式更新您的代码,使通货紧缩变得异步:

Multi<InputSample> keys = Multi.createFrom()
    .item(inputSample)
    .onItem()
    .transformToMulti(array -> Multi.createFrom()
            .iterable(deflateMessage.deflateMessage(array)))
    .runSubscriptionOn(Infrastructure.getDefaultExecutor()) // items will be transformed on a separate thread
    .concatenate();

为了在单独的线程上完成对Kafka队列的完整下游、转换和写入,可以使用emitOn操作符,如下所示:

@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
    ObjectMapper mapper = new ObjectMapper();
    return Uni.createFrom()
            .item(inputSample)
            .onItem()
            .transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
            .emitOn(Executors.newFixedThreadPool(5)) // items will be emitted on a separate thread after transformation
            .onItem()
            .transformToUniAndConcatenate(payload -> {
                try {
                    return producer.writeToKafka(payload, mapper);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
                return Uni.createFrom().<OutputSample>nothing();
            });
}

 类似资料:
  • 我一直在考虑使用Apache Kafka作为事件源配置中的事件存储。发布的事件将与特定的资源相关联,传递到与资源类型相关联的主题,并按资源ID分片到分区中。因此,例如,创建类型为Folder和id 1的资源将产生一个FolderCreate事件,该事件将通过在主题中的分区总数中对id 1进行分片来传递到给定分区中的“Folders”主题。即使我不知道如何处理使日志不一致的并发事件。 最简单的场景是

  • 我们正在构建一个应用程序来从传感器获取数据。数据流传输到Kafka,消费者将从Kafka发布到不同的数据商店。每个数据点都有多个表示传感器状态的属性。 在其中一个消费者中,我们希望仅当值发生更改时才将数据发布到数据存储。例如,如果有温度传感器,每10秒轮询一次数据,我们希望收到如下数据: 在上述情况下,只应发布第一条记录和第三条记录。 为此,我们需要某种方法来比较键的当前值与具有相同键的先前值。我

  • 我试着用蒙特卡罗模拟方法来计算一个事件发生的概率,在一个连续三次的10个事件列表中。我将进行100万次试验。事件发生的概率在任何时候都是31.43%。我的想法是,我将调用任何试验(b),并创建一个嵌套循环,因此如果条件一(rand值小于.3143),我将移动到索引中的下一个数字,如果该数字小于.3143,我将移动到下一个数字。如果发生这种情况,我会给火鸡加1。当100万次试验完成后,我将turke

  • 我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。 在源数据库上,sql数据类型为。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。

  • 我在分布式模式下运行Kafka连接,有两个不同的连接器,每个连接器都有一个任务。每个连接器都在不同的实例中运行,这正是我想要的。 Kafka connect集群是否总是确保相同的行为来适当地分担负载?

  • 问题内容: 假设我有一个实例: 然后,我使用ExecutorService提交上述任务: 现在,我可以通过取消任务。我了解的是将会中断正在运行此任务的工作线程,例如。但这 仅设置一个标志 来告知工作线程已中断。 我的问题是:如果Runnable已开始运行,实际上如何在继续执行其余代码时停止我的代码?是否在下面定期检查工作线程的中断标志?我的意思是我不明白如何仅通过将中断标志设置为true来取消ru