案例:考虑到您有一个包含任务的队列(task1、task2、task3、task1、task2、task3、…),我如何使用两个线程来删除队列中的任务。
要求:
流量图
代码
public class testRunManager {
public static void main(String[] args){
ConcurrentLinkedQueue<BaseTask> tasks = new ConcurrentLinkedQueue<>();
for (int index = 0; index < 10; index++) {
tasks.add(new Task1());
tasks.add(new Task2());
tasks.add(new Task3());
}
BaseRunManager.getInstance().addTasks(tasks);
Thread thread1 = BaseRunManager.getInstance().getNewThread(TaskThread.Type.BeforeWards);
Thread thread2 = BaseRunManager.getInstance().getNewThread(TaskThread.Type.AfterWards);
//start
thread1.start();
thread2.start();
}
}
public class TaskThread extends Thread{
enum Type{
BeforeWards,
AfterWards
}
Type type;
public TaskThread(Type type,Runnable runnable){
super(runnable);
this.type = type;
}
}
public interface ShouldRunBeforeWardsJob {
}
public interface ShouldRunAfterWardsJob {
}
abstract public class RunController {
public enum Performance {
BUSYING,
RUNNING,
PAUSED,
}
protected enum ControlState {
PAUSING,
PAUSED,
STOPING,
RESUMING,
RUNNING,
STEPPING,
}
private ControlState state = ControlState.RUNNING;
private Performance performance = Performance.BUSYING;
private List<ControlListener> listeners = new ArrayList<>();
protected ReentrantLock controlLock = new ReentrantLock();
protected Condition controlCondition = controlLock.newCondition();
public Performance getPerformance() {
return performance;
}
protected ControlState getState() {
return state;
}
public void addListener(ControlListener listener) {
listeners.add(listener);
}
public void removeListener(ControlListener listener) {
listeners.remove(listener);
}
public void pause() {
if (performance != Performance.RUNNING) {
return;
}
setState(ControlState.PAUSING);
}
public void step() {
if (performance != Performance.PAUSED) {
return;
}
setState(ControlState.STEPPING);
}
public void stop() {
if (performance != Performance.RUNNING && performance != Performance.PAUSED) {
return;
}
setState(ControlState.STOPING);
}
public void resume() {
if (performance != Performance.PAUSED) {
return;
}
setState(ControlState.RESUMING);
}
private void setPerformance(Performance p) {
if (performance != p) {
Performance old = this.performance;
this.performance = p;
for (ControlListener cl : listeners) {
cl.performChanged(old, p);
}
}
}
protected void setState(ControlState state) {
controlLock.lock();
try {
this.state = state;
switch (this.state) {
case RESUMING:
case STEPPING:
case PAUSING:
case STOPING:
controlCondition.signal();
setPerformance(Performance.BUSYING);
break;
case PAUSED:
setPerformance(Performance.PAUSED);
break;
case RUNNING:
setPerformance(Performance.RUNNING);
}
}finally {
controlLock.unlock();
}
}
public interface ControlListener {
void performChanged(Performance oldState, Performance newState);
}
}
public abstract class BaseTask {
enum State{
FINISH,
NOT
}
protected State state;
public State getState(){
return state;
}
public void setState(State state){
this.state = state;
}
abstract void runJob();
abstract void doJob();
}
public class BaseRunManager {
private static BaseRunManager instance;
private ConcurrentLinkedQueue<BaseTask> tasks = new
ConcurrentLinkedQueue<>();
public synchronized static BaseRunManager getInstance(){
if(instance == null){
instance = new BaseRunManager();
}
return instance;
}
public BaseRunManager(){
}
public void addTasks(ConcurrentLinkedQueue<BaseTask> tasks){
this.tasks = tasks;
}
public Thread getNewThread(TaskThread.Type type){
return new TaskThread(type,new BaseRunnable());
}
private class BaseRunnable extends RunController implements Runnable{
private BaseTask curTask;
private final AtomicBoolean afterwardsFinish = new AtomicBoolean(true);
private final AtomicInteger beforewardsFinishNum = new AtomicInteger(0);
private final AtomicInteger currentThreadNum = new AtomicInteger(0);
private final Condition condition = controlLock.newCondition();
@Override
public void run() {
currentThreadNum.incrementAndGet();
TaskThread curThread = (TaskThread)Thread.currentThread();
while (tasks.size()>0) {
//get task
controlLock.lock();
try {
curTask = tasks.peek();
if ((curTask instanceof ShouldRunBeforeWardsJob && curThread.type == TaskThread.Type.BeforeWards)
|| (curTask instanceof ShouldRunAfterWardsJob && curThread.type == TaskThread.Type.AfterWards)) {
tasks.poll();
if (curTask instanceof ShouldRunBeforeWardsJob) {
curTask.runJob();
beforewardsFinishNum.incrementAndGet();
condition.signalAll();
} else if (curTask instanceof ShouldRunAfterWardsJob) {
if (beforewardsFinishNum.get() / 2 != 0) {
condition.await();
curTask.runJob();
}
}
} else {
condition.awaitNanos(20);
continue;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
controlLock.unlock();
}
}
}
}
}
下面是另一种方法:
public static void main(String[] args) {
final BlockingQueue<BaseTask> tasks = new LinkedBlockingQueue<>();
final BlockingQueue<BaseTask> queue2 = new LinkedBlockingQueue<>();
for (int index = 0; index < 10; index++) {
tasks.add(new BaseTask("Task1"));
tasks.add(new BaseTask("Task2"));
tasks.add(new BaseTask("Task3"));
}
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
BaseTask task = tasks.take();
task.run();
task = tasks.take();
task.run();
task = tasks.take();
queue2.offer(task);
} catch (InterruptedException ex) {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
BaseTask task = queue2.take();
task.run();
} catch (InterruptedException ex) {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
});
thread2.start();
thread1.start();
}
private static class BaseTask implements Runnable {
private final String name;
public BaseTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(name + " ["
+ Thread.currentThread().getName() + "]");
}
}
> < li> 我必须通过创建三个类,即Task1、Task2和Task3,使用三个线程来插入元素。要插入数组的值是0,1,2,....299. 在线程中重写run方法。三个整数I、j和k表示每个线程应该在给定数组中添加的元素数量。 线程一应该在数组内将0追加到i-1,线程二应该在数组内将I追加到i j-1,第三个线程应该在数组内将i j追加到299。 线程一和线程二必须同时运行,并且线程一和线程
介绍 Task3主要是珠海的任务系统 Host 172.17.6.153 task3.game.duowan.com 172.17.6.153 adm.task3.game.yy.com 接口状态 状态码 说明 200 请求成功 404 活动不存在 403 活动未开启 405 游戏入口受限/未登陆 406 任务不属于该活动 407 角色无效 408 未参加任务 409 未通过防刷验证 411
问题内容: 我对ThreadPoolExecutor有一个非常简单的问题。我有以下情况:我必须使用队列中的对象,为它们创建适当的工作程序任务,然后将其提交给ThreadPoolExecutor。这很简单。但是在关闭情况下, 许多 工作人员可能会排队等待执行。由于这些任务之一可能正在运行一个小时,而且我希望相对快速地正常关闭应用程序,因此我想从ThreadPoolExecutor中丢弃所有排队的任务
本文向大家介绍Python实现简单多线程任务队列,包括了Python实现简单多线程任务队列的使用技巧和注意事项,需要的朋友参考一下 最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码): 一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。 一种解决办法是每调用一次 plotly.write 函数
我想了解在fork-连接池中处理任务Java顺序。 到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,该参数“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真”。 我对这句话的解释是,每个工人都有自己的任务队列;工人从他们自己队列的前面接受任务,或者如果他们自己的队列是空的,从其他工人队列的后面偷走任务;如果asyncMode为真(分别为假),工作人员
我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队