我有一些线程的问题。
我的剧本
1-从文本文件将1000多万行加载到数组中
2-创建5个固定线程的执行池
3-然后它正在迭代该列表并将一些线程添加到队列中
executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));
现在活动线程永远不会绕过5个固定线程,这很好,但我发现我的处理器进入100%负载,我已经调试了一点,我看到正在调用MyCustomThread
构造函数,女巫意味着无论我声明5个固定线程,ExecutorService
仍将尝试创建10万个对象。
主要问题是:我如何防止这种情况?我只是想让线程在没有空间的情况下被拒绝,而不是创建1000万对象并一个接一个地运行它们。
第二个问题:我如何得到当前的活动线程?我尝试了threadGroup.activeCount()
,但它总是给我5 5 5......
调用方类:
System.out.println("Starting threads ...");
final ThreadGroup threadGroup = new ThreadGroup("workers");
//ExecutorService executor = Executors.newFixedThreadPool(howManyThreads);
ExecutorService executor = Executors.newFixedThreadPool(5,new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
}
});
int increment = 0;
for(String line : arrayOfLines)
{
if(increment > 10000)
{
//System.out.println("TOO MANY!!");
//System.exit(0);
}
System.out.println(line);
System.out.println(threadGroup.activeCount());
if(threadGroup.activeCount() >= 5)
{
for(int i = 0; i < 10; i++)
{
System.out.println(threadGroup.activeCount());
System.out.println(threadGroup.activeGroupCount());
Thread.sleep(1000);
}
}
try
{
executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));
}
catch(Exception ex)
{
continue;
//System.exit(0);
}
increment++;
}
executor.awaitTermination(10, TimeUnit.MILLISECONDS);
executor.shutdown();
Thread等级:
public class MyCustomThread extends Thread
{
private String ip;
private String threadName;
private int threadTimeout = 10;
public MyCustomThread(String ip)
{
this.ip = ip;
}
public MyCustomThread(String ip,int threadTimeout,String threadName)
{
this.ip = ip;
this.threadTimeout = threadTimeout;
this.threadName = threadName;
System.out.prinln("MyCustomThread constructor has been called!");
}
@Override
public void run()
{
// do some stuff that takes time ....
}
}
谢谢你。
我认为这里最大的问题是MyCustomThread应该实现Runnable,而不是扩展Thread。使用ExecutorService时,您可以让它处理线程管理(即不需要创建它们)
这是我认为你想要做的一个近似值。希望这有帮助。
public class FileProcessor
{
public static void main(String[] args)
{
List<String> lines = readFile();
System.out.println("Starting threads ...");
ExecutorService executor = Executors.newFixedThreadPool(5);
for(String line : lines)
{
try
{
executor.submit(new MyCustomThread(line));
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
try
{
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
System.out.println("A processor took longer than the await time to complete.");
}
executor.shutdownNow();
}
protected static List<String> readFile()
{
List<String> lines = new ArrayList<String>();
try
{
String filename = "/temp/data.dat";
FileReader fileReader = new FileReader(filename );
BufferedReader bufferedReader = new BufferedReader(fileReader);
String line = null;
while ((line = bufferedReader.readLine()) != null) {
lines.add(line);
}
bufferedReader.close();
}
catch (Exception e)
{
e.printStackTrace();
}
return lines;
}
}
public class MyCustomThread implements Runnable
{
String line;
MyCustomThread(String line)
{
this.line = line;
}
@Override
public void run()
{
System.out.println(Thread.currentThread().getName() + " processed line:" + line);
}
}
编辑:此实现不会阻止ExecutorService提交。我的意思是,无论之前提交的MyCustomThreads是否已完成,都会为文件中的每一行创建一个新的MyCustomThread实例。您可以添加一个阻塞/限制工作队列来防止出现这种情况。
ExecutorService executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LimitedQueue<Runnable>(10));
可以在此处找到阻塞/限制队列实现的示例:
你做错了一点。执行器的原理是将工作单元实现为可运行或可调用(而不是线程)。每个Runnable或Callable都应该做一个原子工作,这与其他Runnable或Callable是互斥的。
Executor服务在内部使用线程池,因此创建线程组和线程没有任何好处。
试试这个简单的片段:
ExecutorService executor = Executors.newFixedThreadPool(5);`
executor.execute(new MyRunnableWorker());
public class MyRunnableWorker implements Runnable{
private String ip;
private String threadName;
private int threadTimeout = 10;
public MyRunnableWorker(String ip){
this.ip = ip;
}
public MyRunnableWorker(String ip,int threadTimeout,String threadName){
this.ip = ip;
this.threadTimeout = threadTimeout;
this.threadName = threadName;
System.out.prinln("MyRunnableWorker constructor has been called!");
}
@Override
public void run(){ {
// do some stuff that takes time ....
}
}
这将为您提供您想要的东西。还要尝试使用Visual alVM测试线程代码执行情况,以查看线程如何运行以及负载分布情况。
假设我有一个Executors静态工厂方法的ExecutorService实例。 如果我从某个线程提交了一个调用,其中RetVal不是线程安全的本地实例化对象,那么当我从同一个线程获得()它时,我需要担心retvals的完整性吗?人们说局部变量是线程安全的,但我不确定当您返回一个本地实例化的对象并从其他线程接收它时,它是否适用。 下面是我的定制实现,我只是为了测试。您可以忽略EType枚举。
问题内容: 有没有一种方法可以使用ExecutorService暂停/恢复特定线程? 想象一下,我想停止id == 0的线程(假设为每个线程分配了一个增量ID,直到达到线程池的大小为止)。 过了一会儿,通过按下一个按钮,我想恢复该特定线程,并将所有其他线程保留为当前状态,这些状态可以暂停或恢复。 我在Java文档中发现了PausableThreadPoolExecutor的未完成版本。但这不适合我
我从主线程调用了下面的代码,使用ExecutorService池并启动一个线程来处理找到的每个文件。我正在尝试了解当主线程被kill命令终止时ExecutorService的行为。生成的线程会发生什么?一旦完成工作,它们会立即被杀还是终止? 还有没有更好/更安全的方法来编写下面的代码段,特别是如果我在无限循环中运行这部分,例如等待文件被放到输入目录并分配线程来处理它们?在这种情况下,我应该创建一个
问题内容: 假设我有一个利用该框架的应用程序 当我在调试器中运行此应用程序时,将使用以下(默认)名称创建一个线程:。如你所见,这并不是非常有用,而且据我所知,该框架没有提供一种简便的方法来命名已创建的线程或线程池。 那么,如何为线程/线程池提供名称呢?例如,。 问题答案: 你可以提供一个到。工厂将负责创建线程,并将能够为其命名。 引用Javadoc: 创建新线程 使用创建新线程。如果没有另外指定,
问题内容: 输出量 自333线程产生于444线程以来,我一直期待输出的最后一行出现“ NewInitialValue”,而tl是本地可继承线程。 是什么导致此问题,如何解决? 问题答案: 当您无法控制线程的创建时,您不应依赖。Javadoc指出: […]创建子线程时,子级将接收父级具有值的所有可继承线程局部变量的初始值。 在您的示例中,线程是由返回者创建的 那是一个执行程序,最多将使用两个线程来执
问题内容: 使用return by时,如何中断它? 问题答案: 为此,您需要将任务分配给,而不是调用。当您执行此操作时,将返回一个可用于操纵计划任务的a。特别是,您可以调用关联程序来中断当前正在执行的任务(或者,如果该任务尚未开始运行,则完全跳过执行)。 顺便说一句,由返回的对象实际上是个。