当前位置: 首页 > 知识库问答 >
问题:

从Postgres获取的流,jOOQ未从类返回结果

狄鹏
2023-03-14

我试图将postgres查询中的结果流式传输到前端应用程序,而不是急切地获取所有结果。问题是,我只能在终端中看到流式结果(即,“org.jooq.tools.LoggerListener:获取的记录:…”中的第一个) 然后使用流。get()。forEach(s)-

该数据也可用于其他任务(例如可视化、下载/导出、汇总统计等)。我一直在浏览有关jOOQ的文档和帖子,我正在使用这些文档和帖子作为我的ORM,我尝试使用以下方法:

  • 默认上下文。fetchStream()
  • 结果查询。流()
  • 结果查询。fetchStream()

急切地获取以下内容目前效果很好,但这将在一个巨大的响应中返回所有内容,并且不会流式传输结果:

  • 结果查询。fetchMaps()

数据ontroller.java

java prettyprint-override">@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {

  @Autowired private QueryService queryService;

  @PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  @ApiOperation(value = "Query the data")
  @ResponseStatus(HttpStatus.CREATED)
  public ResponseEntity<QueryResult> getQueryResults(
      @RequestBody @ValidQuery Query query, HttpServletRequest request) {

    QueryResult res = queryService.search(query);
    return ResponseEntity.ok(res);
  }
// ...
}

QueryResult。JAVA

public QueryResult(Stream<Record> result) {
    this.result = result;
  }

//  public List<Map<String, Object>> getResult() { return result; }
  @JsonProperty("result")
  public Stream<Record> getResult() { return result; }


//  public void setResult(List<Map<String, Object>> result) { this.result = result; }
  public void setResult(Stream<Record> result) { this.result = result; }

}

查询服务。JAVA

@Service
public class QueryService implements SearchService{
  @Autowired DefaultDSLContext dslContext;

  public QueryResult search(Query query) {

    LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();

    // Build selected fields
    List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);

    // Current support is for a single query. All others passed will be ignored
    List<Filter> filters = query.getFilters();
    Filter leadingFilter = QueryUtils.getLeadingFilter(filters);

    // Build "where" conditions
    Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);

    // Get "from" statement
    Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());

    /*
    // Works fine, but is not lazy fetching
    List<Map<String, Object>> results =
        dslContext
            .select(selectFields)
            .from(fromClause)
            .where(conditionClause)
            .limit(query.getOffset(), query.getLimit())
            .fetchMaps();
    */

      // Appears to work only once. 
      // Cannot see any results returned, but the number of records is correct. 
      // Everything in the records is null / undefined in the frontend
      Supplier<Stream<Record>> results = () ->
              dslContext
                      .select(selectFields)
                      .from(fromClause)
                      .where(conditionClause)
                      .limit(query.getOffset(), query.getLimit())
                      .fetchStream();

      // "stream has already been operated upon or closed" is returned when using a Supplier
      results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));

      return new QueryResult(results.get());

  }
}

Query.java

public class Query {
  @NotNull(message = "Query must contain selection(s)")
  private LinkedHashMap<DataSourceName, List<String>> selections;
  private List<Filter> filters;
  private List<Join> joins;
  private List<Sort> sorts;
  private long offset;
  private int limit;

  private QueryOptions options;

  @JsonProperty("selections")
  public LinkedHashMap<DataSourceName, List<String>> getSelections() {
    return selections;
  }

  public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
    this.selections = selections;
  }

  @JsonProperty("filters")
  public List<Filter> getFilters() {
    return filters;
  }

  public void setFilters(List<Filter> filters) {
    this.filters = filters;
  }

  @JsonProperty("joins")
  public List<Join> getJoins() {
    return joins;
  }

  public void setJoins(List<Join> joins) {
    this.joins = joins;
  }

  @JsonProperty("sorts")
  public List<Sort> getSorts() {
    return sorts;
  }

  public void setSorts(List<Sort> sorts) {
    this.sorts = sorts;
  }

  @JsonProperty("options")
  public QueryOptions getOptions() {
    return options;
  }

  public void setOptions(QueryOptions options) {
    this.options = options;
  }

  @JsonProperty("offset")
  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  @JsonProperty("limit")
  public int getLimit() {
    return limit;
  }

  public void setLimit(int limit) {
    this.limit = limit;
  }

  @Override
  public String toString() {
    return "Query{"
        + "selections=" + selections
        + ", filters="  + filters
        + ", sorts="    + sorts
        + ", offSet="   + offset
        + ", limit="    + limit
        + ", options="  + options
        + '}';
  }
}

DataApi。js

// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;

数据jsx

// ...

// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
      const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...

在控制台中返回结果

{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...
  • 码头工人:19.03。5
  • Spring靴:v2。1.8.发布
  • 节点:v12。13.1
  • 反应:16.9。0
  • OpenJDK:12.0。2
  • 约克:3.12。3
  • 博士后:10.7

共有1个答案

长孙宜
2023-03-14

JavaStreamAPI的全部要点就是这样一个流最多只能使用一次。它没有任何缓冲功能,也不支持像被动流实现那样的基于推送的流模型。

您可以向堆栈中添加另一个应用编程接口,例如Reator(还有其他应用编程接口,但由于您已经在使用Spring...),它支持向多个消费者缓冲和重放流,但这与jOOQ直接无关,而且会严重影响影响应用程序的体系结构。

请注意,jOOQ的ResultQuery扩展了org.reactivestreams.PublisherJDK 9的Flow。发布者用于与这种反应性流更好的互操作性。

 类似资料:
  • 当更新其中一个表中的字段时,我很难制定一个jooq查询来返回两个表中的数据。(Postgres 9.6,jooq 3.11) 表DEVICE和CUSTOMER在外键约束设备上联接。CUSTOMERID=CUSTOMER.ID。 我想返回一个合格的device.id和设备客户的customer.secret,并更新device.stateIN_PROGRESS。资格由各种条款评估。 我开始用 这导致

  • 问题内容: 我正在使用Microsoft SQL Server JDBC驱动程序2.0通过Java连接到SQL Server(2005)。 如何从存储过程中获取返回值?我正在做类似的事情: 我应该使用execute()吗?executeQuery()?executeUpdate()?这些似乎都不默认返回一个返回值,但是我不确定如何获取它。 编辑1:明确地说,我知道如何调用存储过程。这个问题专门关于

  • 我有下面的方法,它使用Apache Commons Http客户机向给定的URI发送异步GET,并返回Future和响应。 CloseableHttpAsyncClient实现了Closeable,因此我使用try/resource结构。 下面您可以看到其用法: 问题是,当我调用get on a future时,它不会返回所需的HttpResponse。如果我使用重载的get()方法,它将一直等待

  • 在Postgres中,我创建了一个UDT,如下所示 注意:我知道JOOQ在生成类时会生成枚举类型,但这是获取所需信息的唯一方法吗? 谢了!

  • 为了最大限度地利用java8流和Spring4,我在来自Springs jsdbRestTem板的JDBC结果集上使用了流API,如下所示(代码缩短并简化): 这似乎很有效。客户端可以像这样使用流Api,而不用担心jdbc类 但是,当我重构(尝试将流提供给客户端方法)时,像这样: 我明白了 因此,数据似乎只能在方法。是否有一种干净的方法可以绕过这个问题,返回来自DB的元素的惰性流?假设具体化结果和

  • 问题内容: 我正在尝试向表中插入一些行…我正在使用 postgressql-7.2.jar。 我得到以下异常 org.postgresql.util.PSQLException:查询未返回任何结果。 在org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:255) 我已经用Googl