当前位置: 首页 > 知识库问答 >
问题:

高效地向节点发送请求的算法

督俊雅
2023-03-14

我希望根据每个节点的配置将流量均匀地分配到各个节点。最多可以有100个节点,并且可以配置分配给多个节点的流量百分比。


所以说,如果有4个节点:-

node 1 - 20
node 2 - 50
node 3 - 10
node 4 - 20
------------
sum    - 100
------------

所有节点的值之和应该是100。例子:-

node 1 - 50
node 2 - 1
node 3 - 1
node 4 - 1
.
.
.
node 100 - 1

在上述配置中,共有51个节点。节点1为50,其余50个节点配置为1。

在一个senario中,请求可以按以下模式分布:-节点1、节点2、节点3、节点4、节点5,。。。。,节点51,节点1,节点1,节点1,节点1,节点1,节点1,节点1,节点1,。。。。。。

上述分发是低效的,因为我们连续向节点1发送太多的流量,这可能会导致节点1拒绝请求。

在另一种传感器中,请求可以按以下模式分布:-节点1、节点2、节点1、节点3、节点1、节点4、节点1、节点5、节点1、节点6、节点1、节点7、节点1、节点8。。。。。。

在上述情况下,senario的要求更加有效。

我找到了下面的代码,但无法理解它背后的想法。

func()
{
  for(int itr=1;itr<=total_requests+1;itr++)
  {
      myval = 0;           
      // Search the node that needs to be incremented
       // to best approach the rates of all branches                      
      for(int j=0;j<Total_nodes;j++)
      {

         if((nodes[j].count*100/itr > nodes[j].value) ||
           ((nodes[j].value - nodes[j].count*100/itr) < myval) ||
           ((nodes[j].value==0 && nodes[j].count ==0 )))
              continue;

            cand = j;
            myval = abs((long)(nodes[j].count*100/itr - nodes[j].value));
       }
       nodes[cand].count++;

  }

  return nodes[cand].nodeID;
}

在上面的代码中,total_requests是到目前为止收到的请求总数。total_requests变量将每次递增,为了理解目的,将其视为全局值。

Total_nodes,是配置的节点总数,每个节点使用以下结构表示。

节点是一个结构:-

struct node{
  int count;
  int value;
  int nodeID;
};

例如:-

If 4 nodes are configured :-
node 1 - 20
node 2 - 50
node 3 - 10
node 4 - 20
------------
sum    - 100
------------

将有四个节点[4]创建以下值:-

node1{
  count = 0;
  value = 20;
  nodeID = 1;
};

node2{
  count = 0;
  value = 50;
  nodeID = 2;
};

node3{
  count = 0;
  value = 10;
  nodeID = 3;
};

node4{
  count = 0;
  value = 20;
  nodeID = 4;
};

你能给我解释一下算法或者它是如何有效地分发的吗。

共有2个答案

潘自强
2023-03-14

嗯,似乎当你到达100个节点时,每个节点都必须占用1%的流量?

我真的不知道你提供的功能在做什么。我想它是在试图找到离它长期平均负载最远的节点。但是如果total\u requests是到目前为止的总数,那么我就没有得到外部的目的(int itr=1;itr)

不管怎样,基本上你所做的是类似于构造一个非均匀随机序列。最多有100个节点,如果我可以(暂时)假设0。。999提供了足够的分辨率,然后您可以使用一个具有1000个节点id的“id_向量[]”,其中填充了节点1 id的n1个副本,节点2 id的n2个副本,依此类推——其中节点1将接收n1/1000的流量,依此类推。决策过程非常非常简单——选择id_向量[random()%1000]。随着时间的推移,节点将收到大约正确的通信量。

如果您对随机分布的流量不满意,那么您可以使用节点id为id_向量播种,以便通过“循环”进行选择,并为每个节点获得合适的频率。一种方法是随机洗牌上面构造的id_向量(或者,偶尔洗牌,这样如果一个洗牌是“坏的”,你就不会被卡住)。或者你可以做一个一次性的漏桶的事情,然后从中填充id_向量。每次围绕id_向量,这保证每个节点都将收到分配的请求数。

id_vector越细,对每个节点的短期请求频率的控制就越好。

请注意,以上所有操作都假设节点的相对负载是恒定的。如果不是,那么你就需要(不时地?)调整id_向量。

编辑以根据要求添加更多详细信息...

...假设我们只有5个节点,但我们将每个节点的“权重”表示为n/1000,以允许最多100个节点。假设他们有ID 1。。5、重量:

  ID=1, weight = 100
  ID=2, weight = 375
  ID=3, weight = 225
  ID=4, weight = 195
  ID=5, weight = 105

很明显,加起来是1000。

因此,我们构造一个id\u向量[1000],以便:

  id_vector[  0.. 99] = 1   -- first 100 entries = ID 1
  id_vector[100..474] = 2   -- next  375 entries = ID 2
  id_vector[475..699] = 3   -- next  225 entries = ID 3
  id_vector[700..894] = 4   -- next  195 entries = ID 4
  id_vector[100..999] = 5   -- last  105 entries = ID 5

现在如果我们洗牌id\u vector[]我们会得到一个随机的节点选择序列,但是超过1000个请求,每个节点的请求的正确平均频率。

对于娱乐价值,我尝试了一个“漏桶”,看看它能如何通过为每个节点使用一个漏桶填充id\u向量来保持对每个节点的请求的稳定频率。下面附上了执行此操作的代码,并查看它的性能,以及简单随机版本的性能。

每个泄漏存储桶都有一个cc计数,在下一个请求发送到该存储桶之前,应该发送(到其他节点)的请求数。每次发送请求时,所有存储桶的cc计数都会减少,并且存储桶最小的cc(或者最小id,如果cc相等)的节点将被发送请求,并在该点对节点的存储桶的cc进行充电。(每个请求都会导致所有存储桶滴水一次,并为所选节点的存储桶重新充电。)

cc是存储桶“内容”的整数部分。cc的初始值是q=1000/w,其中w是节点的权重。每次给铲斗充电时,q会添加到cc。然而,为了精确地完成任务,我们需要处理剩余的r=1000%w。。。或者换句话说,“内容”有一个小数部分,这就是cr的作用。内容的真值是cc cr/w(其中cr/w是真分数,而不是整数除法)。其初始值为cc=qcr=r。每次给铲斗充电时,q添加到cc,而r添加到cr。当cr/w

漏桶运行1000次以填充id_vector[]。一点点测试表明,这对所有节点都保持了相当稳定的频率,并且在id_vector[]周期内每次每个节点都有确切的数据包数量。

少量测试表明,random()shuffle方法在每个id_vector[]周期内具有更多的可变频率,但仍然为每个周期提供每个节点的确切数据包数。

泄漏桶的稳定性假设传入请求流是稳定的。这可能是一个完全不现实的假设。如果请求以大的(与id_vector[]周期相比大,在本例中为1000)突发方式到达,那么(简单)随机()洗牌方法的可变性可能与请求到达的可变性相形见绌!

enum
{
  n_nodes  =    5,        /* number of nodes      */
  w_res    = 1000,        /* weight resolution    */
} ;

struct node_bucket
{
  int   id ;            /* 1 origin                 */

  int   cc ;            /* current count            */
  int   cr ;            /* current remainder        */

  int   q ;             /* recharge -- quotient     */
  int   r ;             /* recharge -- remainder    */

  int   w ;             /* weight                   */
} ;

static void bucket_recharge(struct node_bucket* b) ;
static void node_checkout(int weights[], int id_vector[], bool rnd) ;
static void node_shuffle(int id_vector[]) ;

/*------------------------------------------------------------------------------
 * To begin at the beginning...
 */
int
main(int argc, char* argv[])
{
  int node_weights[n_nodes] = { 100, 375, 225, 195, 105 } ;
  int id_vector[w_res] ;
  int cx ;

  struct node_bucket buckets[n_nodes] ;

  /* Initialise the buckets -- charged
   */
  cx = 0 ;
  for (int id = 0 ; id < n_nodes ; ++id)
    {
      struct node_bucket* b ;

      b = &buckets[id] ;

      b->id = id + 1 ;              /* 1 origin     */
      b->w  = node_weights[id] ;

      cx += b->w ;

      b->q  = w_res / b->w ;
      b->r  = w_res % b->w ;

      b->cc = 0 ;
      b->cr = 0 ;

      bucket_recharge(b) ;
    } ;

  assert(cx == w_res) ;

  /* Run the buckets for one cycle to fill the id_vector
   */
  for (int i = 0 ; i < w_res ; ++i)
    {
      int id ;

      id = 0 ;
      buckets[id].cc -= 1 ;         /* drip     */

      for (int jd = 1 ; jd < n_nodes ; ++jd)
        {
          buckets[jd].cc -= 1 ;     /* drip     */

          if (buckets[jd].cc < buckets[id].cc)
            id = jd ;
        } ;

      id_vector[i] = id + 1 ;       /* '1' origin   */

      bucket_recharge(&buckets[id]) ;
    } ;

  /* Diagnostics and checking
   *
   * First, check that the id_vector contains exactly the right number of
   * each node, and that the bucket state at the end is the same (apart from
   * cr) as it is at the beginning.
   */
  int nf[n_nodes] = { 0 } ;

  for (int i = 0 ; i < w_res ; ++i)
    nf[id_vector[i] - 1] += 1 ;

  for (int id = 0 ; id < n_nodes ; ++id)
    {
      struct node_bucket* b ;

      b = &buckets[id] ;

      printf("ID=%2d weight=%3d freq=%3d  (cc=%3d  cr=%+4d  q=%3d  r=%3d)\n",
                                b->id, b->w, nf[id], b->cc, b->cr, b->q, b->r) ;
    } ;

  node_checkout(node_weights, id_vector, false /* not random */) ;

  /* Try the random version -- with shuffled id_vector.
   */
  int iv ;

  iv = 0 ;
  for (int id = 0 ; id < n_nodes ; ++id)
    {
      for (int i = 0 ; i < node_weights[id] ; ++i)
        id_vector[iv++] = id + 1 ;
    } ;
  assert(iv == 1000) ;

  for (int s = 0 ; s < 17 ; ++s)
    node_shuffle(id_vector) ;

  node_checkout(node_weights, id_vector, true /* random */) ;

  return 0 ;
} ;

static void
bucket_recharge(struct node_bucket* b)
{
  b->cc += b->q ;
  b->cr += b->r ;

  if ((b->cr * 2) >= b->w)
    {
      b->cc += 1 ;
      b->cr -= b->w ;
    } ;
} ;

static void
node_checkout(int weights[], int id_vector[], bool rnd)
{
  struct node_test
  {
    int   last_t ;
    int   count ;
    int   cycle_count ;
    int   intervals[w_res] ;
  } ;

  struct node_test tests[n_nodes] = { { 0 } } ;

  printf("\n---Test Run: %s ---\n", rnd ? "Random Shuffle" : "Leaky Bucket") ;

  /* Test run
   */
  int s ;
  s = 0 ;
  for (int t = 1 ; t <= (w_res * 5) ; ++t)
    {
      int id ;

      id = id_vector[s++] - 1 ;

      if (tests[id].last_t != 0)
        tests[id].intervals[t - tests[id].last_t] += 1 ;

      tests[id].count += 1 ;
      tests[id].last_t = t ;

      if (s == w_res)
        {
          printf("At time %4d\n", t) ;

          for (id = 0 ; id < n_nodes ; ++id)
            {
              struct node_test*   nt ;
              long   total_intervals ;

              nt = &tests[id] ;

              total_intervals = 0 ;
              for (int i = 0 ; i < w_res ; ++i)
                total_intervals += (long)i * nt->intervals[i] ;

              printf("  ID=%2d weight=%3d count=%4d(+%3d)  av=%6.2f vs %6.2f\n",
                        id+1, weights[id], nt->count, nt->count - nt->cycle_count,
                                          (double)total_intervals / nt->count,
                                          (double)w_res / weights[id]) ;
              nt->cycle_count = nt->count ;

              for (int i = 0 ; i < w_res ; ++i)
                {
                  if (nt->intervals[i] != 0)
                    {
                      int h ;

                      printf("  %6d x %4d ", i, nt->intervals[i]) ;

                      h = ((nt->intervals[i] * 75) + ((nt->count + 1) / 2))/
                                                                     nt->count ;
                      while (h-- != 0)
                        printf("=") ;
                      printf("\n") ;
                    } ;
                } ;
            } ;

          if (rnd)
            node_shuffle(id_vector) ;

          s = 0 ;
        } ;
    } ;
} ;

static void
node_shuffle(int id_vector[])
{
  for (int iv = 0 ; iv < (w_res - 1) ; ++iv)
    {
      int is, s ;

      is = (int)(random() % (w_res - iv)) + iv ;

      s             = id_vector[iv] ;
      id_vector[iv] = id_vector[is] ;
      id_vector[is] = s ;
    } ;
} ;

薛华奥
2023-03-14

节点[j]. count*100/itr是节点j到目前为止已经回答的请求百分比的下限。节点[j]. value是节点j应该回答的请求百分比。您发布的代码查找落后于目标百分比最远的节点(或多或少,取决于整数除法的不稳定性),并将其分配给下一个请求。

 类似资料:
  • 我有两个节点js应用程序,一个发送post请求,如下所示: 另一个是试图用表达式和正文解析器来处理它: 问题是在接收端我无法检索我正在寻找的json数据。有人知道我错过了什么吗?

  • 问题内容: 想象一下,客户端应用程序需要访问Cassandra集群。在Java api中,我们创建一个集群实例,并通过Session发送读取或写入请求。如果我们使用读/写一致性ONE,则api如何选择实际节点(协调器节点)以转发请求。是随机选择吗?请帮忙弄清楚。 问题答案: Cassandra驱动程序使用“ gossip”协议(以及称为节点发现的过程)来获取有关群集的信息。如果某个节点不可用,则客

  • 剪辑过,我剪短了。 代码变量被赋值 首先需要用get请求触发php脚本,输出不应该保存 然后,当上面的请求完成后,将get请求发送到并将响应保存到一个变量, 对于https://flower.nyaizhel.ml/fun/carbon/shorturl.php?short&code=ii,它应该是6,请参阅url 我试着取,没有结果

  • 我在本地网络的另一台计算机上启动了一个Apache web服务器,该服务器的根文件夹中有一个.php文件,因此我可以使用“192.168.1.5/connect.php?param1=value1&param2=value2”这样的地址从浏览器发送请求。我真的需要从我的Java代码(通过使用HttpUrlConnection),但由于某种原因,它只适用于url,而不适用上面提到的IP地址。有没有一

  • response.content=statuscode:401,reasonprace:'unauthorized',version:1.1,content:system.net.http.streamcontent,header:{ rlogid:t6ldssk%28ciudbq%60anng%7fu2h%3f%3cwk%7difvqn*14%3f0513%29pqtfwpu%29pdhcaj%

  • javax.net.ssl.sslhandShakeException:收到致命警报:在com.ibm.jsse2.O.A(O.java:8)在com.ibm.jsse2.sslsocketimpl.B(SSLSocketimpl.java:40)在com.ibm.jsse2.sslsocketimpl.A(SSLSocketimpl.java:554)在com.ibm.jsse2.sslsock