当前位置: 首页 > 知识库问答 >
问题:

多线程追加器队列中的慢队列裁剪器

赏阳嘉
2023-03-14

我有一个场景,其中多个线程正在写入同一队列。

Appender线程从不同的市场(每个线程都是单一市场)接收更新,并将这些数据推入相同的队列:

ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
        final ExcerptTailer tailer = queue.createTailer();
appender.writeDocument(
                        wire -> {

                                wire
                                        .getValueOut().text("buy")
                                        .getValueOut().text(exchange.name())
                                        .getValueOut().text(currencyPair.toString())
                                        .getValueOut().dateTime(LocalDateTime.now(Clock.systemUTC()))
                                        .getValueOut().text(price);
                            });
while (true){
     tailer.readDocument(........

共有1个答案

谯德元
2023-03-14

创建队列是非常昂贵的,如果可以的话,尝试每个进程只做一次。

创建一个裁剪器也是昂贵的,你应该创建一次,并继续轮询更新。

创建对象可能是昂贵的,我会避免创建任何对象。例如,避免调用tostringlocaldate.now

String path = OS.getTarget();
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
ExcerptAppender appender = queue.acquireAppender();
Exchange exchange = Exchange.EBS;
CurrencyPair currencyPair = CurrencyPair.EURUSD;
double price = 1.2345;
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = appender.writingDocument()) {
            ValueOut valueOut = dc.wire().getValueOut();
            valueOut.text("buy")
                    .getValueOut().asEnum(exchange)
                    .getValueOut().asEnum(currencyPair)
                    .getValueOut().int64(System.currentTimeMillis())
                    .getValueOut().float64(price);
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
    Jvm.pause(100);
}
Throughput was 962,942 messages per second
Throughput was 2,952,433 messages per second
Throughput was 4,776,337 messages per second
Throughput was 3,250,235 messages per second
Throughput was 3,514,863 messages per second
final ExcerptTailer tailer = queue.createTailer();
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = tailer.readingDocument()) {
            if (!dc.isPresent())
                throw new AssertionError("Missing t: " + t + ", i: " + i);
            ValueIn in = dc.wire().getValueIn();
            String buy = in.text();
            Exchange exchange2 = in.asEnum(Exchange.class);
            CurrencyPair currencyPair2 = in.asEnum(CurrencyPair.class);
            long time = in.int64();
            double price2 = in.float64();
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Read Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
}

印刷品

Read Throughput was 477,849 messages per second
Read Throughput was 3,083,642 messages per second
Read Throughput was 5,100,516 messages per second
Read Throughput was 6,342,525 messages per second
Read Throughput was 6,672,971 messages per second
 类似资料:
  • 我需要一个库或我们的软件工具,可以: 1)将线程/作业/任务(任何东西--如果需要,我们可以重写代码,我们在mintue有线程对象)放入像system这样的队列中2)我们可以定义同时最多运行多少线程3)线程完成后,线程从队列中移除,这样GC就可以移除所有涉及的实体。 我正在进行大量阅读,发现ExecutorService(Executors.newFixedThreadPool(5);)但问题可能

  • 在使用TensorFlow进行异步计算时,队列是一种强大的机制。 正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。 为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQ

  • 问题内容: 如何同时在多个对象上“选择” ? Golang的频道具有所需的功能: 其中第一个要解除阻塞的通道执行相应的块。如何在Python中实现? 更新0 根据tux21b答案中给出的链接,所需的队列类型具有以下属性: 多生产者/多消费者队列(MPMC) 提供每个生产者FIFO / LIFO 当队列为空/完整的消费者/生产者被阻止时 此外,渠道可能会被阻塞,生产者将阻塞,直到消费者取回该物品为止

  • 在使用TensorFlow进行异步计算时,队列是一种强大的机制。 正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。 为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQ

  • 问题 你有一个线程队列集合,想为到来的元素轮询它们, 就跟你为一个客户端请求去轮询一个网络连接集合的方式一样。 解决方案 对于轮询问题的一个常见解决方案中有个很少有人知道的技巧,包含了一个隐藏的回路网络连接。 本质上讲其思想就是:对于每个你想要轮询的队列,你创建一对连接的套接字。 然后你在其中一个套接字上面编写代码来标识存在的数据, 另外一个套接字被传给 select() 或类似的一个轮询数据到达

  • 那么,这种架构的瓶颈在哪里?也许推送每条带有互斥体的消息是个坏主意?