当前位置: 首页 > 知识库问答 >
问题:

无分支K-均值(或其他优化)

皇甫飞飙
2023-03-14

注意:我更喜欢如何处理和提出这些类型的解决方案的指南,而不是解决方案本身。

我的系统中有一个非常关键的性能函数,在特定上下文中显示为头号分析热点。它正在进行k-means迭代(已经使用并行处理每个工作线程中的点子范围的多线程)。

ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
    const ClusterCentroid& cent = centroids[i];
    const float dist = ...;
    if (dist < pt.min_dist) // <-- #1 hotspot
    {
        pt.min_dist = dist;
        pt.min_index = i;
    }
}

处理这段代码所需的任何时间节省都非常重要,所以我经常在这段代码上做很多事情。例如,可能值得将质心循环放在外部,并针对给定质心并行遍历点。这里的簇点数量以百万计,而质心的数量以千计。该算法用于少量迭代(通常在10次以下)。它并不寻求完美的收敛性/稳定性,只是一些“合理的”近似。

我们欢迎您提出任何想法,但我真正渴望发现的是,此代码是否可以无分支,因为它可以支持SIMD版本。我还没有真正发展出能够轻松掌握如何提出无分支解决方案的心智能力:我的大脑在这方面出现了故障,就像我在早期第一次接触递归时一样,因此关于如何编写无分支代码以及如何为其培养适当心态的指南也会很有帮助。

简言之,我正在寻找关于如何微优化此代码的任何指南、提示和建议(不一定是解决方案)。它很可能有算法改进的空间,但我的盲点总是在微观优化解决方案上(我很想知道如何更有效地应用它们,而不至于过火)。它已经使用chunky parallel进行了紧密的多线程逻辑处理,因此我几乎被推到了微观优化的角落,这是在没有更智能的算法的情况下尝试的更快的事情之一。我们完全可以自由更改内存布局。

关于试图对O(knm)算法进行微观优化时完全错误的看法,我完全同意。这将这个特定的问题推向了一个有点学术和不切实际的领域。然而,如果可以允许我讲一个轶事,我来自高级编程的原始背景——非常强调广泛、大规模的观点、安全性,而很少关注低级实现细节。我最近将项目切换到一种非常不同的现代风格的项目,我正在从我的同行那里学习各种新技巧,例如缓存效率、GPGPU、无分支技术、SIMD、实际上优于malloc的专用mem分配器(但对于特定场景)等。

这就是我试图赶上最新性能趋势的地方,令人惊讶的是,我发现那些我在90年代经常喜欢的旧数据结构,通常是链接/树型结构,实际上,通过在连续内存块上应用优化指令的更幼稚、野蛮、微优化、并行化的代码,它们的性能大大超过了它们。同时这也有点令人失望,因为我觉得我们现在正在将算法更适合机器,并以这种方式缩小可能性(尤其是使用GPGPU)。

最有趣的是,我发现这种经过微优化、快速数组处理的代码比我以前使用的复杂算法和数据结构更容易维护。首先,它们更容易推广。此外,我的同事经常可以接受客户对某个领域特定放缓的投诉,只需为SIMD做一个并行处理,可能还有一些SIMD,并称其以相当快的速度完成。算法改进通常可以提供更多,但这些微优化的应用速度和非侵入性让我想在该领域了解更多,因为阅读有关更好算法的论文可能需要一些时间(也需要更广泛的更改)。所以我最近一直在跳上微优化的潮流,在这个特定的案例中可能有点太多了,但我的好奇心更多的是关于扩展我的任何场景的可能解决方案范围。

注意:我真的、真的很不擅长组装,所以我经常以一种试错的方式对事情进行更多的调整,对为什么vtune中显示的热点可能是瓶颈提出一些有根据的猜测,然后尝试看看时间是否有所改善,如果时间确实有所改善,假设这些猜测有一些真实的线索,如果没有改善,则完全没有把握。

000007FEEE3FB8A1  jl          thread_partition+70h (7FEEE3FB780h) 
    {
        ClusterPoint& pt = points[j];
        pt.min_index = -1;
        pt.min_dist = numeric_limits<float>::max();
        for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7  cmp         ecx,r10d 
000007FEEE3FB8AA  jge         thread_partition+1F4h (7FEEE3FB904h) 
000007FEEE3FB8AC  lea         rax,[rbx+rbx*2] 
000007FEEE3FB8B0  add         rax,rax 
000007FEEE3FB8B3  lea         r8,[rbp+rax*8+8] 
        {
            const ClusterCentroid& cent = centroids[i];
            const float x = pt.pos[0] - cent.pos[0];
            const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8  movss       xmm0,dword ptr [rdx] 
            const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC  movss       xmm2,dword ptr [rdx+4] 
000007FEEE3FB8C1  movss       xmm1,dword ptr [rdx-4] 
000007FEEE3FB8C6  subss       xmm2,dword ptr [r8] 
000007FEEE3FB8CB  subss       xmm0,dword ptr [r8-4] 
000007FEEE3FB8D1  subss       xmm1,dword ptr [r8-8] 
            const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7  mulss       xmm2,xmm2 
000007FEEE3FB8DB  mulss       xmm0,xmm0 
000007FEEE3FB8DF  mulss       xmm1,xmm1 
000007FEEE3FB8E3  addss       xmm2,xmm0 
000007FEEE3FB8E7  addss       xmm2,xmm1 

            if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB  comiss      xmm2,dword ptr [rdx-8] 
000007FEEE3FB8EF  jae         thread_partition+1E9h (7FEEE3FB8F9h) 
            {
                pt.min_dist = dist;
000007FEEE3FB8F1  movss       dword ptr [rdx-8],xmm2 
                pt.min_index = i;
000007FEEE3FB8F6  mov         dword ptr [rdx-10h],ecx 
000007FEEE3FB8F9  inc         ecx  
000007FEEE3FB8FB  add         r8,30h 
000007FEEE3FB8FF  cmp         ecx,r10d 
000007FEEE3FB902  jl          thread_partition+1A8h (7FEEE3FB8B8h) 
    for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904  inc         edi  
000007FEEE3FB906  add         rdx,20h 
000007FEEE3FB90A  cmp         edi,dword ptr [rsi+4] 
000007FEEE3FB90D  jl          thread_partition+31h (7FEEE3FB741h) 
000007FEEE3FB913  mov         rbx,qword ptr [irange] 
            }
        }
    }
}

我们被迫将SSE 2作为目标——比我们的时代晚了一点,但当我们假设SSE 4作为最低要求也可以时(用户有一些Intel机器的原型),用户群实际上犯了一次错误。

我非常感谢所有提供的帮助!因为代码库非常广泛,触发代码的条件也很复杂(跨多个线程触发的系统事件),所以每次进行实验性更改并对其进行分析有点笨拙。所以我在旁边设置了一个表面测试作为一个独立的应用程序,其他人也可以运行和试用,这样我就可以尝试所有这些慷慨提供的解决方案。

#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
    #define ALIGN16 __declspec(align(16))
#else
    #include <malloc.h>
    #define ALIGN16 __attribute__((aligned(16)))
#endif

using namespace std;

// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
    #ifdef _MSC_VER
        return _aligned_malloc(amount, 16);
    #else
        void* mem = 0;
        posix_memalign(&mem, 16, amount);
        return mem;
    #endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
    return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}

// Aligned free.
static void free16(void* mem)
{
    #ifdef _MSC_VER
        return _aligned_free(mem);
    #else
        free(mem);
    #endif
}

// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;

class Points
{
public:
    Points(): data(malloc16_t<Point>(num_points))
    {
        for (int p=0; p < num_points; ++p)
        {
            const float xyz[3] =
            {
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX
            };
            init(p, xyz);
        }
    }
    ~Points()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].centroid = -1;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void associate(int n, int new_centroid)
    {
        data[n].centroid = new_centroid;
    }
    int centroid(int n) const
    {
        return data[n].centroid;
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }

private:
    Points(const Points&);
    Points& operator=(const Points&);
    struct Point
    {
        int centroid;
        float xyz[3];
    };
    Point* data;
};

class Centroids
{
public:
    Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
    {
        // Naive initial selection algorithm, but outside the 
        // current area of interest.
        for (int c=0; c < num_centroids; ++c)
            init(c, points[c]);
    }
    ~Centroids()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].count = 0;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void reset(int n)
    {
        data[n].count = 0;
        data[n].xyz[0] = 0.0f;
        data[n].xyz[1] = 0.0f;
        data[n].xyz[2] = 0.0f;
    }
    void sum(int n, const float* pt_xyz)
    {
        data[n].xyz[0] += pt_xyz[0];
        data[n].xyz[1] += pt_xyz[1];
        data[n].xyz[2] += pt_xyz[2];
        ++data[n].count;
    }
    void average(int n)
    {
        if (data[n].count > 0)
        {
            const float inv_count = 1.0f / data[n].count;
            data[n].xyz[0] *= inv_count;
            data[n].xyz[1] *= inv_count;
            data[n].xyz[2] *= inv_count;
        }
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }
    int find_nearest(const float* pt_xyz) const
    {
        float min_dist_squared = numeric_limits<float>::max();
        int min_centroid = -1;
        for (int c=0; c < num_centroids; ++c)
        {
            const float* cen_xyz = data[c].xyz;
            const float x = pt_xyz[0] - cen_xyz[0];
            const float y = pt_xyz[1] - cen_xyz[1];
            const float z = pt_xyz[2] - cen_xyz[2];
            const float dist_squared = x*x + y*y * z*z;

            if (min_dist_squared > dist_squared)
            {
                min_dist_squared = dist_squared;
                min_centroid = c;
            }
        }
        return min_centroid;
    }

private:
    Centroids(const Centroids&);
    Centroids& operator=(const Centroids&);
    struct Centroid
    {
        int count;
        float xyz[3];
    };
    Centroid* data;
};

// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
    return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}

static void k_means(Points& points, Centroids& centroids)
{
    // Find the closest centroid for each point.
    for (int p=0; p < num_points; ++p)
    {
        const float* pt_xyz = points[p];
        points.associate(p, centroids.find_nearest(pt_xyz));
    }

    // Reset the data of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.reset(c);

    // Compute new position sum of each centroid.
    for (int p=0; p < num_points; ++p)
        centroids.sum(points.centroid(p), points[p]);

    // Compute average position of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.average(c);
}

int main()
{
    Points points;
    Centroids centroids(points);

    cout << "Starting simulation..." << endl;
    double start_time = sys_time();
    for (int i=0; i < num_iterations; ++i)
        k_means(points, centroids);
    cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
    cout << "# Points: " << num_points << endl;
    cout << "# Centroids: " << num_centroids << endl;

    // Write the centroids to a file to give us some crude verification
    // of consistency as we make changes.
    ofstream out("centroids.txt");
    for (int c=0; c < num_centroids; ++c)
        out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}

我知道表面测试的危险性,但由于它已经被认为是以前真实世界会议的热点,我希望这是可以原谅的。我还对与微优化此类代码相关的一般技术感兴趣。

在分析这一点时,我确实得到了稍微不同的结果。时间在这里的循环中更均匀地分布,我不知道为什么。可能是因为数据较小(我省略了成员,并取出了min\u dist成员,使其成为局部变量)。质心与点之间的精确比率也有点不同,但希望足够接近,以便将这里的改进转化为原始代码。在这个表面测试中,它也是单线程的,并且拆卸看起来非常不同,因此我可能会冒险在没有原始测试的情况下优化这个表面测试(我现在愿意冒这个风险,因为我更感兴趣的是扩展我对可以优化这些情况的技术的知识,而不是针对这个具体情况的解决方案)。

哦,我面临着微观优化的困境,但我对组装的理解还不够。我替换了这个:

        -if (min_dist_squared > dist_squared)
        -{
        -    min_dist_squared = dist_squared;
        -    pt.centroid = c;
        -}

有了这个:

        +const bool found_closer = min_dist_squared > dist_squared;
        +pt.centroid = bitselect(found_closer, c, pt.centroid);
        +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);

..却发现时间从约5.6秒上升到约12.5秒。尽管如此,这并不是他的错,也没有削弱他的解决方案的价值——这是我的错,因为我没有理解机器层面上到底发生了什么,并且在黑暗中冒险。这一次显然错过了,显然我并不像我最初认为的那样是分支预测失误的受害者。尽管如此,他提出的解决方案是一个很好的通用函数,可以在这种情况下尝试,我很感激将其添加到我的技巧工具箱中。现在进入第二轮。

这个解决方案可能很神奇。在将集群代表转换为SoA之后,我得到了大约2.5秒的时间!不幸的是,似乎存在某种故障。对于最终输出,我得到了非常不同的结果,这表明精度差异不仅仅是微小的,包括一些接近尾端的质心,其值为0(意味着在搜索中未找到它们)。我一直在尝试使用调试器检查SIMD逻辑,以查看可能出现的情况——这可能只是我的一个转录错误,但下面是代码,以防有人发现错误。

如果可以在不减慢结果的情况下纠正错误,那么这种速度的提高比我从纯微观优化中想象的要快!

    // New version of Centroids::find_nearest (from harold's solution):
    int find_nearest(const float* pt_xyz) const
    {
        __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
        __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
        __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
        __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
        __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
        __m128i index = min_index;
        for (int i=4; i < num_centroids; i += 4) 
        {
            xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
            ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
            zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
            __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
            __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
            min_dist = _mm_min_ps(min_dist, dist);
            min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                     _mm_andnot_si128(mask, min_index));
            index = _mm_add_epi32(index, _mm_set1_epi32(4));
        }

        ALIGN16 float mdist[4];
        ALIGN16 uint32_t mindex[4];
        _mm_store_ps(mdist, min_dist);
        _mm_store_si128((__m128i*)mindex, min_index);

        float closest = mdist[0];
        int closest_i = mindex[0];
        for (int i=1; i < 4; i++)
        {
            if (mdist[i] < closest) 
            {
                closest = mdist[i];
                closest_i = mindex[i];
            }
        }
        return closest_i;
    }

应用更正并对其进行测试后,结果完好无损,功能正常,并对原始代码库进行了类似的改进!

由于这触及了我一直在寻求更好理解的知识圣杯(无分支SIMD),我将奖励该解决方案一些额外的道具,以使操作速度提高一倍以上。我的家庭作业是努力理解它,因为我的目标不仅仅是缓解这一热点,而是扩大我个人对可能的解决方案的理解。

尽管如此,我还是感谢这里所有的贡献,从算法建议到非常酷的bitselect技巧!我希望我能接受所有的答案。我可能最终会在某个时候尝试所有的答案,但现在我的功课是理解其中一些非算术SIMD操作。

int find_nearest_simd(const float* pt_xyz) const
{
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
    __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
    __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);

    __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
    __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
    __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i=4; i < num_centroids; i += 4) 
    {
        xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
        ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
        zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }

    ALIGN16 float mdist[4];
    ALIGN16 uint32_t mindex[4];
    _mm_store_ps(mdist, min_dist);
    _mm_store_si128((__m128i*)mindex, min_index);

    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i=1; i < 4; i++)
    {
        if (mdist[i] < closest) 
        {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}

共有3个答案

瞿健
2023-03-14

C是一种高级语言。您认为C源代码中的控制流转换为分支指令的假设是有缺陷的。我没有您示例中某些类型的定义,因此我制作了一个具有类似条件赋值的简单测试程序:

int g(int, int);

int f(const int *arr)
{
    int min = 10000, minIndex = -1;
    for ( int i = 0; i < 1000; ++i )
    {
        if ( arr[i] < min )
        {
            min = arr[i];
            minIndex = i;
        }
    }
    return g(min, minIndex);
}

请注意,使用未定义的“g”只是为了防止优化器删除所有内容。我用G 4.9.2和-O3和-S将其转换为x86\u 64汇编(甚至不必更改-march的默认值),结果是(不足为奇)循环体不包含分支

movl    (%rdi,%rax,4), %ecx
movl    %edx, %r8d
cmpl    %edx, %ecx
cmovle  %ecx, %r8d
cmovl   %eax, %esi
addq    $1, %rax

除此之外,无分支必然更快的假设也可能有缺陷,因为新距离“击败”旧距离的可能性正在减少您所查看的元素的数量。这不是掷硬币。“位选择”技巧是在编译器生成“好像”程序集的攻击性远不如今天时发明的。我更倾向于建议您在尝试返工代码以便编译器能够更好地对其进行优化之前,或者在将结果作为手写汇编的基础之前,先看看编译器实际生成的汇编类型。如果您想研究SIMD,我建议尝试一种“最小值最小值”的方法,减少数据依赖性(在我的示例中,对“最小值”的依赖可能是一个瓶颈)。

鲁乐
2023-03-14

您可以使用无分支三元运算符,有时称为bitselect(条件?true: false)。
只需将其用于2个成员,默认不做任何事情。
不要担心额外的操作,它们与if语句分支相比算不了什么。

bitselect实现:

inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
{
    return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
}

inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
{
    //Reinterpret floats. Would work because it's just a bit select, no matter the actual value
    int& at = reinterpret_cast<int&>(truereturnvalue);
    int& af = reinterpret_cast<int&>(falsereturnvalue);
    int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
    return  reinterpret_cast<float&>(res);
}

您的循环应该如下所示:

for (int i=0; i < num_centroids; ++i)
{
  const ClusterCentroid& cent = centroids[i];
  const float dist = ...;
  bool isSmaeller = dist < pt.min_dist;

  //use same value if not smaller
  pt.min_index = bitselect(isSmaeller, i, pt.min_index);
  pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
}
令狐经武
2023-03-14

太糟糕了,我们不能使用SSE4.1,但是很好,它是SSE2。我还没有测试过这个,只是编译了一下,看看是否有语法错误,看看程序集是否有意义(基本上没问题,尽管GCC溢出了min_index,即使一些xmm寄存器没有使用,不确定为什么会这样)

int find_closest(float *x, float *y, float *z,
                 float pt_x, float pt_y, float pt_z, int n) {
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
    __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
    __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i = 4; i < n; i += 4) {
        xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
        ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
        zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }
    float mdist[4];
    _mm_store_ps(mdist, min_dist);
    uint32_t mindex[4];
    _mm_store_si128((__m128i*)mindex, min_index);
    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i = 1; i < 4; i++) {
        if (mdist[i] < closest) {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}

与往常一样,它希望指针16对齐。此外,填充应该是无限大的点(因此它们永远不会最接近目标)。

SSE 4.1允许您替换此

min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                         _mm_andnot_si128(mask, min_index));

被这个

min_index = _mm_blendv_epi8(min_index, index, mask);

这是一个asm版本,为vsyasm制作,经过一点测试(似乎有效)

bits 64

section .data

align 16
centroid_four:
    dd 4, 4, 4, 4
centroid_index:
    dd 0, 1, 2, 3

section .text

global find_closest

proc_frame find_closest
    ;
    ;   arguments:
    ;       ecx: number of points (multiple of 4 and at least 4)
    ;       rdx -> array of 3 pointers to floats (x, y, z) (the points)
    ;       r8 -> array of 3 floats (the reference point)
    ;
    alloc_stack 0x58
    save_xmm128 xmm6, 0
    save_xmm128 xmm7, 16
    save_xmm128 xmm8, 32
    save_xmm128 xmm9, 48
[endprolog]
    movss xmm0, [r8]
    shufps xmm0, xmm0, 0
    movss xmm1, [r8 + 4]
    shufps xmm1, xmm1, 0
    movss xmm2, [r8 + 8]
    shufps xmm2, xmm2, 0
    ; pointers to x, y, z in r8, r9, r10
    mov r8, [rdx]
    mov r9, [rdx + 8]
    mov r10, [rdx + 16]
    ; reference point is in xmm0, xmm1, xmm2 (x, y, z)
    movdqa xmm3, [rel centroid_index]   ; min_index
    movdqa xmm4, xmm3                   ; current index
    movdqa xmm9, [rel centroid_four]     ; index increment
    paddd xmm4, xmm9
    ; calculate initial min_dist, xmm5
    movaps xmm5, [r8]
    subps xmm5, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm5, xmm5
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm5, xmm7
    addps xmm5, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    sub ecx, 4
    jna _tail
_loop:
    movaps xmm6, [r8]
    subps xmm6, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm6, xmm6
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm6, xmm7
    addps xmm6, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    movaps xmm7, xmm6
    cmpps xmm6, xmm5, 1
    minps xmm5, xmm7
    movdqa xmm7, xmm6
    pand xmm6, xmm4
    pandn xmm7, xmm3
    por xmm6, xmm7
    movdqa xmm3, xmm6
    paddd xmm4, xmm9
    sub ecx, 4
    ja _loop
_tail:
    ; calculate horizontal minumum
    pshufd xmm0, xmm5, 0xB1
    minps xmm0, xmm5
    pshufd xmm1, xmm0, 0x4E
    minps xmm0, xmm1
    ; find index of the minimum
    cmpps xmm0, xmm5, 0
    movmskps eax, xmm0
    bsf eax, eax
    ; index into xmm3, sort of
    movaps [rsp + 64], xmm3
    mov eax, [rsp + 64 + rax * 4]
    movaps xmm9, [rsp + 48]
    movaps xmm8, [rsp + 32]
    movaps xmm7, [rsp + 16]
    movaps xmm6, [rsp]
    add rsp, 0x58
    ret
endproc_frame

在C中:

extern "C" int find_closest(int n, float** points, float* reference_point);
 类似资料:
  • 问题内容: 我有一个熊猫数据框,其中包含每月数据,我想为其计算12个月的移动平均值。但是缺少一月每个月的数据(NaN),所以我正在使用 但这只是给我所有的NaN值。 有没有一种简单的方法可以忽略NaN值?我了解实际上,这将成为11个月的移动平均值。 数据框还有其他包含一月数据的变量,所以我不想只扔掉一月的列并进行11个月的移动平均。 问题答案: 有几种方法可以解决此问题,最好的方法取决于一月份的数

  • 问题内容: 我正在寻找带有示例的k-means算法的Python实现来聚类和缓存我的坐标数据库。 问题答案: 更新:( 在最初回答之后十一年,可能是该进行更新的时候了。) 首先,您确定要使用k均值吗? 该页面很好地总结了一些不同的聚类算法。我建议您在图形之外,特别查看每种方法所需的参数,并确定您是否可以提供所需的参数(例如,k均值需要簇的数量,但是也许您不知道在开始之前就知道了)群集)。 以下是一

  • OpenCV中的K-Means聚类 作者|OpenCV-Python Tutorials 编译|Vincent 来源|OpenCV-Python Tutorials 目标 了解如何在OpenCV中使用cv.kmeans()函数进行数据聚类 理解参数 输入参数 sample:它应该是np.float32数据类型,并且每个功能都应该放在单个列中。 nclusters(K):结束条件所需的簇数 crit

  • $k$均值聚类算法(k-means clustering algorithm) 在聚类的问题中,我们得到了一组训练样本集 ${x^{(1)},...,x^{(m)}}$,然后想要把这些样本划分成若干个相关的“类群(clusters)”。其中的 $x^{(i)}\in R^n$,而并未给出分类标签 $y^{(i)}$ 。所以这就是一个无监督学习的问题了。 $K$ 均值聚类算法如下所示: 随机初始化(

  • 目标 在本章中,我们将了解K-Means聚类的概念,其工作原理等。 理论 我们将用一个常用的例子来处理这个问题。 T-shirt尺寸问题 考虑一家公司,该公司将向市场发布新型号的T恤。显然,他们将不得不制造不同尺寸的模型,以满足各种规模的人们的需求。因此,该公司会记录人们的身高和体重数据,并将其绘制到图形上,如下所示: 公司无法制作所有尺寸的T恤。取而代之的是,他们将人划分为小,中和大,并仅制造这

  • 聚类 聚类,简单来说,就是将一个庞杂数据集中具有相似特征的数据自动归类到一起,称为一个簇,簇内的对象越相似,聚类的效果越好。它是一种无监督的学习(Unsupervised Learning)方法,不需要预先标注好的训练集。聚类与分类最大的区别就是分类的目标事先已知,例如猫狗识别,你在分类之前已经预先知道要将它分为猫、狗两个种类;而在你聚类之前,你对你的目标是未知的,同样以动物为例,对于一个动物集来