当前位置: 首页 > 知识库问答 >
问题:

Java 中的线程和计算

乜业
2023-03-14

我是java的新手,我正在尝试编写一个包含两个参数的程序:

  1. 我们必须对素数求和的数
  2. 我们必须执行此操作的线程数

所以我使用了一个名为Eratosthene的方法来存储一个布尔数组,如果一个数字是素数,我们将其标记为真,然后我们将这个数字的所有倍数标记为假。

我试着把我的数组分成每个线程的子数组,在每个子数组中进行运算,最后把所有子数组的结果相加。

但我不知道我做错了哪里:有时程序没有给出好的结果。

这是我的代码:

SumPrime.java

import java.util.*;
import java.util.concurrent.*;

public class SumPrimes {

    private boolean array[];
    private int numberOfWorkers;
    private Semaphore allFinished;

    public SumPrimes(int num, int threads){
        array = new boolean[num];
        numberOfWorkers = threads;
        for (int i = 2; i < num; i++)
            array[i] = true;
    }

    private class SumParallel extends Thread {
        int min;
        int max;
        long sum;

        SumParallel(int min, int max){
            this.min = min;
            this.max = max;
            sum = 0;
        }

        public void run() {
            for (int i = min; i < max; i++) {
                if (array[i]) {
                    for (int j = min; j*i < array.length; j++) {
                        array[i*j] = false;
                    }
                    sum += i;
                }
            }
            allFinished.release();
        }

        public long getSum() {
            return sum;
        }
    }

    public void SumInParallel() {
        allFinished = new Semaphore(0);

        List<SumParallel> workers = new ArrayList<SumParallel>();
        int lengthOfOneWorker = array.length / numberOfWorkers;
        for (int i = 0; i < numberOfWorkers; i++) {
            int start = i * lengthOfOneWorker;
            int end = (i+1) * lengthOfOneWorker;

            if (i == numberOfWorkers - 1)
                end = array.length;
            SumParallel worker = new SumParallel(start, end);
            workers.add(worker);
            worker.start();
        }

        try {
            allFinished.acquire(numberOfWorkers);
        } catch (InterruptedException ignored) {}

        int sum = 0;
        for (SumParallel w : workers){
            sum += w.getSum();
        }

        System.out.println("The sum of prime numbers is: " + sum);
    }

    public static void main(String[] args) {
        int limitNum = Integer.parseInt(args[0]);
        int threadNum = Integer.parseInt(args[1]);
        SumPrimes sum_primes = new SumPrimes(limitNum, threadNum);
        sum_primes.SumInParallel();
    }
}

您可以这样运行程序:

java SumPrimes 1000 3

我愿意接受任何改进我的代码的建议。

共有3个答案

谷飞星
2023-03-14

我认为你的问题在于这段代码:

   public void run() {
        for (int i = min; i < max; i++) {
            if (array[i]) {
                for (int j = min; j*i < array.length; j++) {
                    array[i*j] = false;
                }
                sum += i;
            }
        }
        allFinished.release();
    }

想象一下你后面的一个线程,在列表的末尾工作。第一个项目不是素数,但是识别它不是素数的工作还没有完成——它来自另一个线程,那个线程刚刚开始。所以你相信这个值是素数(它还没有被标记为非素数)并相应地工作。

如果你提供一个产生坏结果的例子,我们可以很容易地测试这个理论。

苏高旻
2023-03-14

多线程通常也意味着你想让事情变得更快。因此,首先可能值得回顾一下您的初始设计,并使其在单线程上更快。那么这是一个要打败的目标。此外,为了在不编写优化基准测试的情况下比较运行时,您需要一个“可见”长度的运行时。
在我的机器上,带有“设置”

int max = 1_000_000_000;
boolean sieve[] = new boolean[max];
long sum = 0; // will be 24739512092254535 at the end

你的原始代码,

for(int i=2;i<max;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }

运行 24-28 秒。正如下面的评论中所讨论的那样,@Andreas的帖子,后来在里面(是的,现在我看到它被接受了,大部分讨论都消失了),内部循环做了很多额外的检查(因为它一直在做一个比较,即使它实际上不会开始)。因此,外循环可以分为两部分:首先筛分和求和(直到max的最后一个“未知”除数,它不大于其平方根),然后只求和其余的:

int maxunique=(int)Math.sqrt(max);
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }
for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

这个在我的机器上运行14-16秒。显著增益,尚不涉及线程。

然后是线程,以及if(!sieve[i])的问题:在计算总和时,对于超过i的低素数,在内循环之前,不能进行这样的检查,所以sieve[i]确实可以告诉它是否是素数。因为例如,如果线程像 for(int i=4;i) 一样运行

for(int i=2;i<=maxunique;i++)
    if(!sieve[i])
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;

int numt=4;
Thread sumt[]=new Thread[numt];
long sums[]=new long[numt];
for(int i=0;i<numt;i++) {
    long ii=i;
    Thread t=sumt[i]=new Thread(new Runnable() {
        public void run() {
            int from=(int)Math.max(ii*max/numt,2);
            int to=(int)Math.min((ii+1)*max/numt,max);
            long sum=0;
            for(int i=from;i<to;i++)
                if(!sieve[i])
                    sum+=i;
            sums[(int)ii]=sum;
        }
    });
    t.start();
}

for(int i=0;i<sumt.length;i++) {
    sumt[i].join();
    sum+=sums[i];
}

这有点整洁,所有线程(我有4个内核)都检查相同数量的候选线程,结果更快。有时几乎是一秒钟,但大多是半秒左右(~0.4…~0.8秒)。因此,这一个并不真正值得努力,筛分回路是这里实时消耗的部分。

人们可以决定允许多余的工作,并为筛子中遇到的每个素数启动一个线程,即使它不是一个实际的素数,只是还没有被勾掉:

List<Thread> threads=new ArrayList<>();
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        int ii=i;
        Thread t=new Thread(new Runnable() {
            public void run() {
                for(int j=ii*2;j<max;j+=ii)
                    sieve[j]=true;
            }
        });
        t.start();
        threads.add(t);
    }
//System.out.println(threads.size());
for(int i=0;i<threads.size();i++)
    threads.get(i).join();

for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

注释后的println()会告诉(在我的机器上)创建了3500-3700个线程(如果有人在原始循环中放置一个计数器,结果是3401是最小值,在单线程筛选循环中会遇到许多质数)。虽然超调量不是灾难性的,但线程的数量相当高,而且增益也不太显著,尽管它比之前的尝试更明显:运行时间为10-11秒(当然,通过使用并行和循环,可以将其降低半秒以上)
当循环过滤到非质数时,可以通过关闭循环来解决一些冗余工作:

for(int j=ii*2;j<max && !sieve[ii];j+=ii)

这个实际上有一些效果,为我带来了8.6-10.1秒的运行时间。

由于创建3401个线程并不比创建3700个线程更疯狂,因此限制它们的数量可能是一个好主意,这是一个更容易向Threads挥手告别的地方。虽然从技术上讲可以计算它们,但有各种内置的基础设施可以为我们做到这一点
Executors有助于将线程数限制在固定数量(newFixedThreadPool()>),或者更好地限制在可用CPU的数量(newWorkStealingPool(()):

ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<Object>(es);

int count=0;

for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        int ii=i;
        count++;
        ecs.submit(new Callable<Object>() {
            public Object call() throws Exception {
                // if(!sieve[ii])
                for(int j=ii*2;j<max /**/ && !sieve[ii] /**/;j+=ii)
                    sieve[j]=true;
                return null;
            }
        });
    }
System.out.println(count);
while(count-->0)
    ecs.take();
es.shutdown();
long sum=0;

for(int i=2;i<max;i++)
    if(!sieve[i])
        sum+=i;

通过这种方式,它会产生与前一个结果类似的结果(8.6-10.5s)。但是,对于低 CPU 计数(4 个内核),交换条件会导致一些加速(取消注释 if 并在循环中注释 /**/ 之间的相同条件),因为任务按其提交顺序运行,因此大多数冗余循环可以在最开始时退出,使重复检查浪费时间。然后对我来说是8.5-9.3秒,击败了直接线程尝试的最佳和最差时间。但是,如果您的CPU数量很高(我也在超级计算节点上运行它,根据运行时.available处理器()有32个内核可用),则任务将重叠更多,并且非欺骗版本(因此始终执行检查的版本)将更快。

如果您想要一个较小的加速,具有较好的可读性,您可以使用流来并行化内部循环(Threads也可以这样做,只是非常乏味):

long sum=0;
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        sum+=i;
        int ii=i;
        IntStream.range(1, (max-1)/i).parallel().forEach(
            j -> sieve[ii+j*ii]=true);
    }

for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

这个非常像原始的优化循环对,并且仍然有一些速度,对我来说是9.4-10.0秒。因此,它比其他产品慢(约10%左右),但它要简单得多。

> < li>

我修复了一系列一个接一个的错误:< code>xy

创建无限数量的子任务困扰着我,幸运的是,我们找到了一个反向设计,我们不检查是否达到sqrt(max)(即maxunique>),但我们知道,如果我们已经完成了低于某个 限制*限制的数字,因为在这个范围( limitlimit*limit)内保留的素数实际上是一个素数(我们仍然可以记住,这个上限是由maxunique maxunique限定的)。因此,这些可以并行筛选。

基本算法,仅用于检查(单线程):

int limit=2;
do {
    int upper=Math.min(maxunique+1,limit*limit);
    for(int i=limit;i<upper;i++)
        if(!sieve[i]) {
            sum+=i;
            for(int j=i*2;j<max;j+=i)
                sieve[j]=true;
        }
    limit=upper;
} while(limit<=maxunique);

for(int i=limit;i<max;i++)
    if(!sieve[i])
        sum+=i;

由于某些原因,它比最初的两个循环变体稍慢(13.8-14.5秒vs 13.7-14.0秒,最小/最大运行次数为20次),但我还是对并行化感兴趣
可能是因为质数的分布不均匀,使用并行流效果不佳(我认为它只是将工作预先划分为看似相等的部分),但基于Executor的方法效果很好:

ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<>(es);

int limit=2;
int count=0;
do {
    int upper=Math.min(maxunique+1,limit*limit);
    for(int i=limit;i<upper;i++)
        if(!sieve[i]) {
            sum+=i;
            int ii=i;
            count++;
            ecs.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    for(int j=ii*2;j<max;j+=ii)
                        sieve[j]=true;
                    return null;
                }
            });
        }
    while(count>0) {
        count--;
        ecs.take();
    }
    limit=upper;
} while(limit<=maxunique);

es.shutdown();

for(int i=limit;i<max;i++)
    if(!sieve[i])
        sum+=i;

对于低CPU计数环境,这是迄今为止最快的环境(7.4-9.0秒,而“无限线程数”的时间是8.7-9.9秒,而其他基于Executor的环境的时间是8.5-9.2秒)。然而,在开始时,它运行的并行任务数量很少(当limit=2时,它只启动两个并行循环,分别为2和3),最重要的是,这些是运行时间最长的循环(步长最小),因为在高CPU计数的环境中,它仅排在基于原始Executor的第二位,2.9-3.6秒,而不是2.7-3.2秒)
当然可以在一开始就实施单独的爬坡,明确收集必要数量的素数以饱和可用的核心,然后切换到基于限制的方法,然后无论核心数量多少,结果都可能优于其他方法。然而,我认为我现在可以抵挡住诱惑。

储承
2023-03-14

你需要完全重新思考线程的逻辑

各种线程无法访问数组的相同范围,例如,如果线程的 min = 100max = 150,则只能使用和/或更改范围为 100 到 149(含)的元素。

您的代码:

for (int i = min; i < max; i++) {
    if (array[i]) {
        for (int j = min; j*i < array.length; j++) {
            array[i*j] = false;

i=100开头,j=100,这使得i*j=10000。如果数组真的那么大,这意味着您访问数组[10000],但这是不允许的。当然,数组没有那么大,所以代码什么都不做。

啊,你说,第一个线程的min = 0max = 50,所以它会将值从索引0(0 * 0)更改为2401(49 * 49),并且由于数组小于此值,它将更新整个数组,但这是不允许的。

现在,再想一想。

如果范围是< code>min = 100,max = 150,那么您需要首先清除该范围内的所有偶数,然后清除所有可被3整除的数字,然后清除所有...以此类推,但仅限于该范围。

我会让你重新思考这个逻辑。

更新

要将埃拉托色尼筛应用于某个范围,我们需要素数到该范围最大值的平方根。

如果范围是 min = 150,max = 200,则 maxPrime = sqrt(200) = 14,因此我们需要从 2 到 14(包括 14)的素数,那么我们可以更新范围 150-199。

假设我们首先更新数组以找到范围2-14中的所有素数,我们可以使用它来迭代目标范围(150-199)中这些素数的倍数。为此,我们需要从素数的最低倍数开始,即

通过整数数学,为了四舍五入到下一个倍数,我们计算:

lower = (min + prime - 1) / prime * prime

这给了我们主要的逻辑:

maxPrime = (int) Math.sqrt(max);
for (int prime = 2; prime <= maxPrime; prime++) {
    if (array[prime]) {
        int lower = (min + prime - 1) / prime * prime;
        for (int i = lower; i < max; i += prime)
            array[i] = false

我们还应该让每个线程负责首先设置范围内的所有布尔,这样这个部分也变成了多线程。

主逻辑现在必须首先在主线程中找到范围2-sqrt(N)中的素数,然后在线程之间拆分剩余范围。

这是我的尝试:

public static long sumPrimes(int n, int threadCount) {
    // Find and sum the "seed" primes needed by the threads
    int maxSeedPrime = (int) Math.sqrt(n + 2); // extra to be sure no "float errors" occur
    boolean[] seedPrime = new boolean[maxSeedPrime + 1];
    AtomicLong totalSum = new AtomicLong(sumPrimes(seedPrime, seedPrime, 0, maxSeedPrime));

    // Split remaining into ranges and start threads to calculate sums
    Thread[] threads = new Thread[threadCount];
    for (int t = 0, rangeMin = maxSeedPrime + 1; t < threadCount; t++) {
        int min = rangeMin;
        int max = min + (n - min + 1) / (threadCount - t) - 1;
        threads[t] = new Thread(() ->
            totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
        );
        threads[t].start();
        rangeMin = max + 1;
    }

    // Wait for threads to end
    for (int t = 0; t < threadCount; t++) {
        try {
            threads[t].join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // Return the calculated sum
    return totalSum.get();
}
private static long sumPrimes(boolean[] seedPrime, boolean[] rangePrime, int min, int max/*inclusive*/) {
    // Initialize range
    for (int i = Math.max(min, 2); i <= max; i++) {
        rangePrime[i - min] = true;
    }

    // Mark non-primes in range
    int maxPrime = (int) Math.sqrt(max + 1); // extra to be sure no "float errors" occur
    for (int prime = 2; prime <= maxPrime; prime++) {
        if (seedPrime[prime]) {
            int minMultiple = (min + prime - 1) / prime * prime;
            if (minMultiple <= prime)
                minMultiple = prime * 2;
            for (int multiple = minMultiple; multiple <= max ; multiple += prime) {
                rangePrime[multiple - min] = false;
            }
        }
    }

    // Sum the primes
    long sum = 0;
    for (int prime = min; prime <= max; prime++) {
        if (rangePrime[prime - min]) {
            sum += prime;
        }
    }
    return sum;
}

测验

public static void main(String[] args) {
    test(1000, 3);
    test(100000000, 4);
}
public static void test(int n, int threadCount) {
    long start = System.nanoTime();
    long sum = sumPrimes(n, threadCount);
    long end = System.nanoTime();
    System.out.printf("sumPrimes(%,d, %d) = %,d (%.9f seconds)%n",
                      n, threadCount, sum, (end - start) / 1e9);
}

输出

sumPrimes(1,000, 3) = 76,127 (0.005595600 seconds)
sumPrimes(100,000,000, 4) = 279,209,790,387,276 (0.686881000 seconds)

更新 2

上面的代码使用的是lambda表达式:

threads[t] = new Thread(() ->
    totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
);

如果您不想使用lambda表达式,例如,它将在Java7上运行,您可以使用匿名类代替:

threads[t] = new Thread() {
    @Override
    public void run() {
        totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max));
    }
};
 类似资料:
  • 问题内容: 绿色线程和本机线程有什么区别? 为什么将其命名为绿色和原生? 我是编程世界的新手。我喜欢学习Java。在经历Java线程面试问题时,我发现了这一点。我听说过线程,但是没有听说过这些绿色线程和本地线程。我对绿色线程和本机线程感到困惑,但不清楚。 在这种情况下,线程被称为绿色线程还是本地线程?(我的意思是在编程中) 问题答案: 绿色线程和本机线程有什么区别? 绿色线程由虚拟机调度。 本机线

  • 问题内容: 在多线程环境中使用Singleton类的首选方法是什么? 假设我有3个线程,并且所有这些线程都尝试同时访问单例类的方法- 如果不保持同步会怎样? 在内部使用 方法还是使用块是好的做法。 请告知是否还有其他出路。 问题答案: 从理论上讲,这项任务并不容易,因为您要使其真正成为线程安全的。 在此上找到了一篇非常不错的论文@ IBM 仅获取单例不需要任何同步,因为这只是读取。因此,只需同步S

  • 问题内容: 我从一个非常简单的多线程示例开始。我试图做一个线程安全的计数器。我想创建两个线程,使计数器间歇地增加到1000。以下代码: 据我所知,while循环现在意味着只有第一个线程才能访问计数器,直到达到1000。输出: 我该如何解决?如何获得共享计数器的线程? 问题答案: 两个线程都可以访问您的变量。 您看到的现象称为线程饥饿。输入代码的受保护部分后(很抱歉,我之前错过了它),其他线程将需要

  • 问题内容: 今天有人在采访中问我有关Java线程概念的问题吗?问题是… 什么是线程? 我们为什么要穿线? 线程上的实时示例。 我们可以在Spring框架服务类中创建线程吗? flex可以调用线程吗? 除了Thread的定义之外,我没有回答任何问题,我也是从互联网中学到的。 谁能对此清楚地解释我。 更新 : 线程和普通的Java类之间有什么区别。为什么我们需要线程…我可以在线程中执行业务逻辑吗?我可

  • 问题内容: 假设我有一个利用该框架的应用程序 当我在调试器中运行此应用程序时,将使用以下(默认)名称创建一个线程:。如你所见,这并不是非常有用,而且据我所知,该框架没有提供一种简便的方法来命名已创建的线程或线程池。 那么,如何为线程/线程池提供名称呢?例如,。 问题答案: 你可以提供一个到。工厂将负责创建线程,并将能够为其命名。 引用Javadoc: 创建新线程 使用创建新线程。如果没有另外指定,

  • 一个常见的问题:为什么启动线程是调用start()方法,而不是调用run()方法呢? 答案: 每个线程都在单独的调用堆栈中启动。 从主线程调用run()方法,run()方法转到当前调用堆栈,而不是新调用堆栈的开头。 如下面例子所示: 输出结果为: 以上程序Demo的run()线程方法和Demo的main()方法都是处于主线程中,换句话说,这里并没有创建新的线程。 如果直接调用run()方法,则会出