当前位置: 首页 > 知识库问答 >
问题:

Quarkus SQS消费者

童浩言
2023-03-14

我正在查看关于使用Quarkus从SQS消费的指南。

问题是我想在无休止的循环中执行它,例如每10秒获取一次新消息,并使用Hibernate Reactive从消息中插入一些数据到数据库中。

我创建了一个Quarkus调度程序,但由于它不支持返回Uni,我不得不阻止Hibernate Responsive的响应,因此出现了这个错误

2022-02-16 15:01:24,058 ERROR [de.sup.tea.con.SqsConsumer] (vert.x-eventloop-thread-9) Finished with error!: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] io.vertx.core.impl.NoStackTraceThrowable: Timeout
    [Exception 1] java.lang.IllegalStateException: HR000061: Session is currently connecting to database

使用Quarkus和reactive实现我所需的最佳方法是什么?

共有2个答案

西门智
2023-03-14

代码将有助于理解您正在做什么。根据您在问题中获得的信息,我建议您使用以下代码创建Uni:

Uni.createFrom().item(returnDataFromDb());
闽哲
2023-03-14

由于Quarkus调度程序不在I/O线程中,因此无法使用hibernate。所以,为了让它工作,你可以和EventBus一起工作。下面是一个功能齐全的示例。processReceivedMessageResponse方法中的代码在I/O线程中运行,可以依赖于Hibernate。

import io.quarkus.scheduler.Scheduled
import io.quarkus.vertx.ConsumeEvent
import io.smallrye.mutiny.Uni
import io.vertx.mutiny.core.eventbus.EventBus
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.jboss.logging.Logger
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse
import java.util.concurrent.CompletionStage
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Instance

@ApplicationScoped
class SqsConsumer(
    private val eventBus: EventBus,
    private val logger: Logger,
    @ConfigProperty(name = "sqs.consumer.maxFetchedMessages")
    private val maxFetchedEvents: Int,
    private val handlers: Instance<MessageHandler>,
    private val sqsClient: SqsAsyncClient,
) {

    @Scheduled(every = "{sqs.consumer.interval}")
    fun execute() {
        handlers.stream().forEach { handler ->
            val handlerName = handler.javaClass.name
            logger.info("Fetching messages for $handlerName...")
            Uni
                .createFrom()
                .completionStage(fetchMessages(handler.queueUrl()))
                .subscribe()
                .with(
                    { response ->
                        val newEventsCount = response.messages().size
                        if (newEventsCount > 0) {
                            logger.info("$newEventsCount message(s) fetched for $handlerName.")
                            eventBus.send("receive-message-responses", ResponseHolder(handler, response))
                        } else {
                            logger.info("Queue was empty. Maybe next time.")
                        }
                    },
                    { logger.error("Error fetching messages!", it) }
                )
        }
    }

    @ConsumeEvent("receive-message-responses")
    fun processReceivedMessageResponse(holder: ResponseHolder): Uni<Void> {
        val handler = holder.handler
        val handlerName = handler.javaClass.name
        val messageResponse = holder.receiveMessageResponse
        logger.info("Processing messages for $handlerName...")
        return Uni
            .createFrom()
            .item(holder)
            .flatMap { handler.process(messageResponse.messages().map { message -> message.body() }) }
            .onItem()
            .invoke { _ ->
                logger.info("Processing succeeded. Deleting processed events from the queue...")
                messageResponse
                    .messages()
                    .forEach { eventBus.send("processed-messages", MessageHolder(handler, it)) }
            }
            .replaceWithVoid()
            .onFailure()
            .invoke { it -> logger.error("Error processing messages!", it) }
    }

    @ConsumeEvent("processed-messages")
    fun deleteProcessedMessages(holder: MessageHolder): Uni<Void> {
        val handler = holder.handler
        val message = holder.message
        return Uni
            .createFrom()
            .completionStage(
                sqsClient.deleteMessage {
                    it
                        .queueUrl(handler.queueUrl())
                        .receiptHandle(message.receiptHandle())
                }
            )
            .onItem()
            .invoke { _ -> logger.info("Message ${message.messageId()} deleted from the queue!") }
            .onFailure()
            .invoke { it -> logger.error("Could not delete message ${message.messageId()} from the queue!", it) }
            .replaceWithVoid()
    }

    private fun fetchMessages(queueUrl: String): CompletionStage<ReceiveMessageResponse> {
        return sqsClient
            .receiveMessage {
                it
                    .maxNumberOfMessages(maxFetchedEvents)
                    .queueUrl(queueUrl)
            }
    }
}

class ResponseHolder(
    val handler: MessageHandler,
    val receiveMessageResponse: ReceiveMessageResponse,
)

class MessageHolder(
    val handler: MessageHandler,
    val message: Message,
)
 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(