当前位置: 首页 > 知识库问答 >
问题:

LMAX Disruptor SPSC-每秒600万操作

劳高爽
2023-03-14

使用Disruptor环形缓冲区,我每秒只能执行600万次操作。我想知道我哪里出了问题。我的事件处理程序只是插入一个计数器。这是单生产者和单消费者。有人能告诉我我是否语义学本身错了。程序创建一个生产者线程添加到缓冲区。并创建一个事件处理程序来处理发布事件。每次发布事件时,eventhandler都会增加一个易失性计数器。

public class MainClass{

   public static class globalVariables
   {
        static int NUMBER_OF_ITERATIONS = 33554432; // 2 power 25     
        static int NUMBER_OF_THREADS;
        static int RING_SIZE = NUMBER_OF_ITERATIONS;
        static int WRITE_CODE = 1;

        static volatile int keep_going = 1; 
   };

   public void start_execution()
   {            
        int remainder = globalVariables.NUMBER_OF_ITERATIONS % globalVariables.NUMBER_OF_THREADS;               
        int iterations_per_thread = ( globalVariables.NUMBER_OF_ITERATIONS - remainder ) / globalVariables.NUMBER_OF_THREADS ;


      /* New Shared Object */
      final sharedObject newSharedObject = new sharedObject();


    ExecutorService exec = Executors.newFixedThreadPool(1);
      Disruptor<valueEvent> disruptor = new Disruptor<valueEvent>( valueEvent.EVENT_FACTORY, globalVariables.RING_SIZE, exec );


      /* Creating event handler whenever an item is published in the queue */

           final EventHandler<valueEvent> handler = new EventHandler<valueEvent>()
           {
              public void onEvent(final valueEvent event, final long sequence, 
                          final boolean endOfBatch) throws Exception
              {         
                 newSharedObject.shared_variable++; // increment the shared variable
              }
           };


          /* Use the above handler to handler events */
          disruptor.handleEventsWith(handler);


     /* start Disruptor */
     final RingBuffer<valueEvent> ringBuffer = disruptor.start();


     final long[] runtime = new long [globalVariables.NUMBER_OF_THREADS];
     /* Code the producer thread */
     final class ProducerThread extends Thread {
        int i;

        public ProducerThread( int i )
        {
            this.i = i;
        }

        public void run()
        {
           long idle_counter = 0;
           long count;

           System.out.println("In thread "+i );

           long startTime = System.nanoTime();

           //while( globalVariables.keep_going == 1 )
           for( int counter=0; counter<globalVariables.NUMBER_OF_ITERATIONS; counter++ )
           {
              // Publishers claim events in sequence
              long sequence = ringBuffer.next();
              valueEvent event = ringBuffer.get(sequence);

              event.setValue(globalVariables.WRITE_CODE); 

              // make the event available to EventProcessors
              ringBuffer.publish(sequence);  
           }

           long stopTime = System.nanoTime();
           runtime[i] = (stopTime - startTime)/1000; 
        }
     };

     /* ------------------------------------------------------------------------------- */     
     //final class AlarmHandler extends TimerTask {     
          /*** Implements TimerTask's abstract run method.   */
    //    @Override public void run(){
    //      globalVariables.keep_going = 0;
    //    }
   //  };

     /* ------------------------------------------------------------------------------- */
     /* Creating Producer threads */
     ProducerThread[] threads = new ProducerThread[globalVariables.NUMBER_OF_THREADS];
     for (int i = 0; i < globalVariables.NUMBER_OF_THREADS; i++) {
        threads[i] = new ProducerThread( i );
        threads[i].start();
     }

     // Waiting for the threads to finish
     for (int i = 0; i < globalVariables.NUMBER_OF_THREADS; i++) {
        try
        {
         threads[i].join();
        } catch ( InterruptedException e ) { System.out.println("hi exception :)"); }
     } 

     /* shutdown */     
     disruptor.shutdown();
     exec.shutdown();

     /* Printing Statistics */
     System.out.println( "Shared Variable: " + newSharedObject.shared_variable );
     for ( int i=0; i<globalVariables.NUMBER_OF_THREADS; i++ )
     {
         System.out.println("Runtime="+ runtime[i] + "; Operations per second = " + (globalVariables.NUMBER_OF_ITERATIONS  / runtime[i] )*1000000 +"ops/sec" );         
     }     

   }

    public static void main( String args[] )
    {
        globalVariables.NUMBER_OF_THREADS = Integer.parseInt( args[0] );    

        System.out.println( "Number of Threads = "+ globalVariables.NUMBER_OF_THREADS );

        MainClass mainObj = new MainClass();
        mainObj.start_execution();
        System.exit(0);
    }
};

这里是程序的输出

共享变量:33554432;运行时=5094139微秒;每秒操作=6000000

任何帮助都将不胜感激。

共有1个答案

通远
2023-03-14

由于您在单个线程中运行事件处理程序,并且不共享该状态,因此您应该通过让事件处理程序在非易失性字段上工作来获得显着更好的性能(但仍然是正确的功能)。中断器确保您的处理程序一次只处理一个事件,因此您无需担心丢失增量。

如果系统中的另一个组件依赖于以特定顺序出现的该值(例如:它是一个控制值),那么您应该考虑将类似AtomicInteger的东西与lazySet一起使用[1]。

[1]http://psy-lob-saw.blogspot.com.au/2012/12/atomiclazyset-is-performance-win-for.html

 类似资料:
  • 问题内容: 有一天,我怀疑我将不得不学习hadoop并将所有这些数据传输到非结构化数据库中,但是我感到惊讶的是,在如此短的时间内,性能如此显着下降。 我有一个只有不到600万行的mysql表。我正在对该表进行非常简单的查询,并相信我已经安装了所有正确的索引。 查询是 解释返回 因此,据我所知,我使用的索引正确,但是此查询需要11秒钟才能运行。 数据库是MyISAM,而phpMyAdmin表示该表是

  • 我们正在使用图形API来获取客户端每个页面上所有帖子的共享数量,每天运行一次,我们使用graph.facebook.com/post_id,但我们经常得到 (#613)对流的调用已超过每600秒600次调用的速率 我尝试使用批量请求,似乎批次中的每个请求都被计算为限制。有什么建议吗? 以下是我们迄今为止的发现: < li>FQL流表没有“股份”字段。 < li>Post insights没有与页面

  • 我使用3个VM服务器,每个都有16个核心/56 GB Ram /1TB,来设置一个kafka集群。我的工作与Kafka0.10.0版本。我在其中两个上安装了一个经纪人。我创建了一个主题,有2个分区,1个分区/代理,没有复制。 我的目标是每秒接收1 000 000条信息。 我用Kafka制作人perf test做了一个测试。sh script和我得到的消息在150000 msg/s和204000 m

  • 主要内容:1、页缓存技术 + 磁盘顺序写,2、零拷贝技术,3、最后的总结这篇文章来聊一下Kafka的一些架构设计原理,这也是互联网公司面试时非常高频的技术考点。 Kafka是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。 那么Kafka到底是如何做到这么高的吞吐量和性能的呢?这篇文章我们来一点一点说一下。 1、页缓存技术 + 磁盘顺序写 首先Kafka每次接收到数据都会往磁

  • 问题内容: 以给定的速率安排一段Java代码的最简单方法是什么? 问题答案: 在Java 5+中,带有: 上面的方法是有利的。在Java 5之前,您使用和:

  • 问题内容: 我有一个简单的Java程序,该程序读取一个文本文件,将其分隔为“”(空格),显示第一个单词,等待2秒,显示下一个…等等…我想在Spring或其他一些GUI。 关于如何使用spring轻松更新单词的任何建议?遍历我的列表并以某种方式使用setText(); 我没有运气。我正在使用此方法在consol中打印我的单词,并向其中添加JFrame …在consol中效果很好,但是却发出了无尽的j