当前位置: 首页 > 知识库问答 >
问题:

如何逐个消息发送给Kafka

郑宇
2023-03-14
@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
     if(Objects.isNull(currentMessage)){
          //currentMessage is an instance variable which is null when I start the application
          return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
     }
     else {
          //currentMessage has been correctly set with the file information
          LOGGER.info(currentMessage);
          return ReactiveStreams.of(currentMessage).map(Message::of);
     }
}

如何保持一个连续的流,以“反应”新的丢弃的文件?(或其他事件,如HTTP GET请求或类似的事件)...

例如,如果我不返回PublisherBuilder的实例,而是返回一个整数,那么我的kafka主题将由一个非常巨大的整数值流填充。这就是为什么示例在发送消息时使用一些间隔...

我应该使用一些CompletationStage或CompletableFuture吗?RXJava2?使用哪个lib有点混乱(vertx,smallrye,rxjava2,microprofile,...)

    null

非常感谢!

共有1个答案

通建安
2023-03-14

让我们从smallrye-reactive-messaging和smallrye-reactive-streams-operators之间的区别开始:smallrye-reactive-streams-operators与smallrye-reactive-messaging相同,但它还支持微轮廓上下文传播。由于大多数反应消息传递提供者在后台使用Vert.x,它将以事件循环样式处理您的消息,这意味着它将在单独的线程中运行。有时需要将一些ctx从基线程传播到新线程中(例如:填充CDI和Tx上下文以执行一些JPA实体管理器逻辑)。这里有ctx传播帮助。

用于方法签名。您可以查看SmallRye-reactive-streams的官方文档第3、4和5节。每一个都有不同的用例。你想用哪种口味的,由你决定。

什么时候用什么?如果您不是在反应上下文中运行,您可以使用下面的命令发送消息。

对于消息消费,您可以使用方法签名,如下所示:

@Incomputing(“Channel-2”)public CompletionStage doSomething(Message an

 类似资料:
  • 如何发送buf然后接收msg 方法 我正在尝试通过从连接出站发送msg并从入站接收msg然后返回消息Mono来实现此方法。但我只能在that(Publisher)方法中接收消息。它似乎无法返回数据Mono 我试过这个。 但它会一直阻塞,直到连接超时 我尝试了另一个代码。我添加了一个handle方法,并将响应放到map中。然后我可以得到单声道。fromSupply(),在映射处有一个while循环中

  • 在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制

  • 接口说明 轻推轻应用/订阅号支持发送文本、图片、文本卡片、图文、key-value、文件、待办等消息类型。本接口针对各种消息类型和发送的对象(单发、群发以及给部分人发送)进行了定义。 注:openid是用户关注某个轻应用/订阅号后生成的唯一id,单发和给部分人发送消息必须携带此参数,可以通过如下接口来获取: 根据qt_code获取用户基本信息 获取使用者列表 通过userId获取openid 消息

  • 主动发送消息 use EasyWeChat\Kernel\Messages\TextCard; // 获取 Messenger 实例 $messenger = $app->messenger; // 准备消息 $message = new TextCard([ 'title' => '你的请假单审批通过', 'description' => '单号:1928373, ...

  • 向已经创建连接凭据的设备发送消息数据。 请求方式: |4|2|3|message|\r 参数 message 发送的消息内容 返回值: "|4|2|3|1|\r" 发送成功 "|4|2|3|2|\r" 发送失败 Arduino样例: softSerial.print("|4|2|3|DFRobot|\r");

  • 我已在中创建了我的应用程序并配置为云消息传递。当我从控制台发送通知时,设备会收到通知,但如果我尝试通过Rest API(使用PostMan)发送。然后通知不会到达设备,但响应显示为成功。 这是我的邮差请求 URI-https://fcm.googleapis.com/fcm/send 标题:内容类型:应用程序/json授权:密钥=MY_SERVER_KEY 正文:{“数据”:{“标题”:“火力基地