当前位置: 首页 > 知识库问答 >
问题:

Spring Webflux Async PostgreSQL Publisher在第一个结果后停止

公冶嘉
2023-03-14

我正在尝试用反应异步postgres-async-driver替换PostgreSQL数据库轮询器,并将新插入的行流式传输到Spring5WebFlux反应websocket客户机,就像这里演示的Josh Long的出色示例一样,并基于Sébastien Deleuze的Spring-reactive-playground。

我的发布服务器获取第一个,但不返回后续行。问题是我的observablepublisher还是我如何使用postgres-async-driverdb

public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
    return
        // com.github.pgasync.Db
        db.queryRows(sql)
        // ~RowMapper method
        .map(row -> mapRowToDto(row))
        // serialize dto to String for websocket
        .map(dto -> { return objectMapper.writeValueAsString(dto); })
        // finally, write to websocket session 
        .map(str -> { return session.textMessage((String) str);
        });
}

然后,我使用RXReActiveStream.topublisher转换器将Observable连接到我的WebSocketHandler中:

@Bean
WebSocketHandler dbWebSocketHandler() {
    return session -> {
        Observable<WebSocketMessage> o = getObservableWSM(session);
        return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
    };
}

这将从我的SQL语句中获取第一个,但不获取其他行。如何继续传输附加行?

理想情况下,我想我需要一个MongoDB可调整游标的PostgreSQL等价物。

共有1个答案

商棋
2023-03-14

基于以下示例,我创建了一个Postgres触发器,该触发器在inserts中激发到我的表:

CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
  id bigint;
BEGIN
  IF TG_OP = 'INSERT' THEN
    id = NEW.id;
  ELSE
    id = OLD.id;
  END IF;
  PERFORM pg_notify('my_trigger_name', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

然后我使用reactive-pg-client订阅了Postgres触发器。下面是它们的pub/sub示例中的代码:

@Bean
PgPool subscribedNotificationHandler() {
    PgPool client = pgPool();
    client.getConnection(asyncResult -> {
        if (asyncResult.succeeded()) {
            PgConnection connection = asyncResult.result();
            connection.notificationHandler(notification -> {
                notification.getPayload();
                // do things with payload
            });
            connection.query("LISTEN my_trigger_name", ar -> {
                log.info("Subscribed to channel");
            });
        }
    });
    return client;
}
 类似资料:
  • 我正在使用Android中的Speech认知器和识别器来实现语音识别。我的目标是在我的语音识别器在屏幕上显示结果后重新开始听语音。为此,我使用以下代码。 问题是,第一次运行正常并显示结果,但在第二次开始侦听(从onResults方法调用)后,由于某种原因,它听不到正在说的内容。然后它给出一个ERROR\u SPEECH\u TIMEOUT错误,这意味着没有语音输入。在Logcat上,我可以看到它进

  • 在阅读了Oracle站点上的这篇文章https://community.Oracle.com/docs/doc-995305之后,我将尝试实现“Some Two-to-One selection patterns”段落中描述的模式。这最后一类模式还包含二对一模式。但是这次不是执行一次下游元素,而是完成两个上游元素,当两个上游元素中的一个完成时执行下游元素。例如,当我们要解析域名时,这可能会证明非常

  • 这是我在控制器中的getIndex()函数中的东西 所以我希望从循环中获取所有类别名称。 但是,例如,如果我希望通过在视图中执行此操作来获得结果 结果是 > 对象(类别)169(20){[“可填充”:受保护]= 字符串(3)“foo1” 第一个结果从哪里来,我如何摆脱它?谢谢!

  • 当前的效果是悬停时会有伸长的下划线, 但是会有一个问题, 只能做一行 如果我把p标签结构改成如下 那么如何在悬停时让每一行都出现线条?

  • 问题内容: 此示例取自tour.golang.org/#63 输出 为什么只打印次数而不是? 编辑: 答案可以引自golang规范: 程序执行首先初始化主程序包,然后调用函数main。当函数main返回时,程序退出。它不等待其他(非主)goroutine完成。 问题答案: 当您的主要功能结束时,程序即结束,即所有goroutine均终止。您的主体在完成之前会终止。如果您在主课程结束时睡了一段时间,

  • 我有一个简单的文件格式,我想用jison解析器生成器解析。这个文件可以由任意顺序和数量的多个表达式组成。这是解析器的jison文件: 为了简单起见,我将文件缩短为只有字符串和文件id表达式。 我的问题是,如果第二个表达式只包含一个类似令牌的字符串,那么生成的解析器似乎只能识别一个或两个完整的表达式。例如: 文件版本: 1.0 将被解析,或者 文件版本:1.0“我的字符串” 也将被解析,但对于 文件