我有一个相当标准的生产者和消费者线程:
碰巧解码过程是一个瓶颈,可能会受益于更多的CPU。这是制作人70%的时间。如果我引入“解码器”线程,我会获得任何显著的性能吗?
由于内存不足,我需要使用一个队列——无法承受两个队列(字节/项),所以我猜对象“转换”开销会出现吗?
有没有关于如何实现这个3线程解决方案的想法?
谢谢你!
2个队列,其中一个用于保存多个消费者解码的未编码对象。
多个消费者将解码并将解码后的对象写入第二个队列,最终消费者将从中消费。
确保避免死锁(除非你真的知道你在做什么,否则使用通知所有()
not通知()
)
您应该为生产者和消费者调整线程池——例如,如果消费者相对于生产者的速度太快,那么它的线程池可以分配比生产者的线程池更少的线程。这应该会导致吞吐量的显著增加。生产者和消费者线程的比例应该被调整(例如3:1)。
在类似的线路上,您可以有三个线程池,其中生产者(读者)和消费者的线程数较少,而解码器(转换器)的线程池的线程数较多。我不确定你是否需要代码示例,在这种情况下,你应该分享你目前拥有的。我会从生产者和消费者的线程池大小为1,转换器(解码器)的线程池大小为5开始,然后测量瓶颈是什么(如果吞吐量满足您的期望)
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerDecoderConsumer {
/**
* @param args
*/
public static void main(String[] args) {
BlockingQueue<Integer> inputQueue = new PriorityBlockingQueue<Integer>();
BlockingQueue<String> outputQueue = new PriorityBlockingQueue<String>();
ExecutorService reader = Executors.newSingleThreadExecutor();
reader.submit(new Producer(inputQueue));
ExecutorService decoder = Executors.newFixedThreadPool(5);
decoder.submit(new Transformer(inputQueue, outputQueue));
ExecutorService writer = Executors.newSingleThreadExecutor();
writer.submit(new Consumer(outputQueue));
}
private static class Producer implements Callable<Void> {
final BlockingQueue<Integer> queue;
public Producer(final BlockingQueue<Integer> pQueue) {
queue = pQueue;
}
@Override
public Void call() throws Exception {
try {
Random random = new Random();
while (true) {
queue.put(random.nextInt());
}
} catch (Exception e) {
}
return null;
}
}
private static class Transformer implements Callable<Void> {
final BlockingQueue<Integer> inputQueue;
final BlockingQueue<String> outputQueue;
public Transformer(final BlockingQueue<Integer> pInputQueue, final BlockingQueue<String> pOutputQueue) {
inputQueue = pInputQueue;
outputQueue = pOutputQueue;
}
@Override
public Void call() throws Exception {
try {
while (true) {
Integer input = inputQueue.take();
String output = String.valueOf(input); // decode input to output
outputQueue.put(output); // output
}
} catch (Exception e) {
}
return null;
}
}
private static class Consumer implements Callable<Void> {
final BlockingQueue<String> queue;
public Consumer(final BlockingQueue<String> pQueue) {
queue = pQueue;
}
@Override
public Void call() throws Exception {
try {
while (true) {
System.out.println(queue.take());
}
} catch (Exception e) {
}
return null;
}
}
}
我已经添加了一些代码来说明这个想法——我使用的是两个阻塞队列,不同于你问题中提到的单个队列,因为我不认为仅仅拥有额外的队列会有开销——我建议使用探查器来演示这样的事情。不过,我希望您觉得它很有用,如果您真的觉得有必要,可以将其改进为单队列模型。
问题内容: 我有一个IC接触式读卡器和SLE5528智能卡。想知道如何真正开始使用这些物品。 正在读取读取器,插入智能卡后看不到任何影响。 我还从http://www.openscdp.org/安装了opensmart的智能卡外壳 但是我不能用它来读任何读卡器。我想知道它是否有兼容性问题。 请我知道我可能不恰当地提出了这个问题,但是请那里的任何人帮助我。 任何相关的链接或有用的信息都可以帮助我入门
我们希望我们的Redis更具可扩展性,并且能够添加更多的读取实例。 我正在尝试使用此新的读卡器endpoint:https://aws.amazon.com/about-aws/whats-new/2019/06/amazon-elasticache-launches-reader-endpoint-for-redis 但是,我没有看到使用这种方法的任何简单或自动的方法,我可以设置哪个endpoi
问题内容: Android中是否存在用于String的base-64解码器和编码器? 问题答案: 看到 似乎这是在API版本8或android 2.2中添加的,因此在较旧的平台上将不可用。 但是它的来源是这样,如果需要的话,可以将其原样复制为旧版本。
Netty 的是一个复杂和先进的框架,但它并不玄幻。当我们请求一些设置了 key 的给定值时,我们知道 Request 类的一个实例被创建来代表这个请求。但 Netty 并不知道 Request 对象是如何转成 Memcached 所期望的。Memcached 所期望的是字节序列;忽略使用的协议,数据在网络上传输永远是字节序列。 将 Request 对象转为 Memcached 所需的字节序列,N
如果你定义并注册了一个message codec,你可以将任何对象发送到event bus 上。 消息编解码器有一个名称,您在发送或发布该消息时在DeliveryOptions中指定该名称: eventBus.registerCodec(myCodec); DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.na