当前位置: 首页 > 知识库问答 >
问题:

如何使用python Tornado服务器在请求中最好地执行多处理?

昝光临
2023-03-14

我有两个问题:如何简化目前的方法?它可能存在哪些陷阱?

>

  • 利用Tornado的builtin异步装饰器,它允许请求保持打开,并允许ioloop继续。

    使用Python的multiprocessing模块为“繁重的”任务生成一个单独的进程。我首先尝试使用threading模块,但无法将控制权可靠地交还给IOLOOP。另外,mutliprocessing似乎也可以利用多核。

    然后确保在回调中调用finish()以完成请求并提交答复。

    下面是展示这种方法的一些示例代码。multi_tornado.py是实现上述大纲的服务器,而call_multi.py是以两种不同方式调用服务器以测试服务器的示例脚本。这两个测试都用3个慢速get请求和20个快速get请求调用服务器。结果显示了在线程打开和不打开的情况下运行的结果。

    在“无线程”运行的情况下,3个慢速请求块(每个都需要一秒钟多一点的时间来完成)。20个快速请求中的一些挤在ioloop中的一些慢速请求之间(不完全确定这是如何发生的--但可能是我在同一台机器上同时运行服务器和客户机测试脚本的工件)。这里的重点是所有的快速请求都在不同程度上被搁置。

    有没有更简单的方法来完成这一点?在这种方法中可能潜伏着什么怪物?

    (注意:未来的折衷可能是使用反向代理运行更多的Tornado实例,比如nginx进行负载平衡。无论如何,我都将使用负载平衡器运行多个实例--但我担心只将硬件扔到这个问题上,因为硬件似乎与阻塞问题直接相关。)

    multi_tornado.py(示例服务器):

    import time
    import threading
    import multiprocessing
    import math
    
    from tornado.web import RequestHandler, Application, asynchronous
    from tornado.ioloop import IOLoop
    
    
    # run in some other process - put result in q
    def heavy_lifting(q):
        t0 = time.time()
        for k in range(2000):
            math.factorial(k)
    
        t = time.time()
        q.put(t - t0)  # report time to compute in queue
    
    
    class FastHandler(RequestHandler):
        def get(self):
            res = 'fast result ' + self.get_argument('id')
            print res
            self.write(res)
            self.flush()
    
    
    class MultiThreadedHandler(RequestHandler):
        # Note:  This handler can be called with threaded = True or False
        def initialize(self, threaded=True):
            self._threaded = threaded
            self._q = multiprocessing.Queue()
    
        def start_process(self, worker, callback):
            # method to start process and watcher thread
            self._callback = callback
    
            if self._threaded:
                # launch process
                multiprocessing.Process(target=worker, args=(self._q,)).start()
    
                # start watching for process to finish
                threading.Thread(target=self._watcher).start()
    
            else:
                # threaded = False just call directly and block
                worker(self._q)
                self._watcher()
    
        def _watcher(self):
            # watches the queue for process result
            while self._q.empty():
                time.sleep(0)  # relinquish control if not ready
    
            # put callback back into the ioloop so we can finish request
            response = self._q.get(False)
            IOLoop.instance().add_callback(lambda: self._callback(response))
    
    
    class SlowHandler(MultiThreadedHandler):
        @asynchronous
        def get(self):
            # start a thread to watch for
            self.start_process(heavy_lifting, self._on_response)
    
        def _on_response(self, delta):
            _id = self.get_argument('id')
            res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
            print res
            self.write(res)
            self.flush()
            self.finish()   # be sure to finish request
    
    
    application = Application([
        (r"/fast", FastHandler),
        (r"/slow", SlowHandler, dict(threaded=False)),
        (r"/slow_threaded", SlowHandler, dict(threaded=True)),
    ])
    
    
    if __name__ == "__main__":
        application.listen(8888)
        IOLoop.instance().start()
    
    import sys
    from tornado.ioloop import IOLoop
    from tornado import httpclient
    
    
    def run(slow):
        def show_response(res):
            print res.body
    
        # make 3 "slow" requests on server
        requests = []
        for k in xrange(3):
            uri = 'http://localhost:8888/{}?id={}'
            requests.append(uri.format(slow, str(k + 1)))
    
        # followed by 20 "fast" requests
        for k in xrange(20):
            uri = 'http://localhost:8888/fast?id={}'
            requests.append(uri.format(k + 1))
    
        # show results as they return
        http_client = httpclient.AsyncHTTPClient()
    
        print 'Scheduling Get Requests:'
        print '------------------------'
        for req in requests:
            print req
            http_client.fetch(req, show_response)
    
        # execute requests on server
        print '\nStart sending requests....'
        IOLoop.instance().start()
    
    if __name__ == '__main__':
        scenario = sys.argv[1]
    
        if scenario == 'slow' or scenario == 'slow_threaded':
            run(scenario)
    
    Scheduling Get Requests:
    ------------------------
    http://localhost:8888/slow?id=1
    http://localhost:8888/slow?id=2
    http://localhost:8888/slow?id=3
    http://localhost:8888/fast?id=1
    http://localhost:8888/fast?id=2
    http://localhost:8888/fast?id=3
    http://localhost:8888/fast?id=4
    http://localhost:8888/fast?id=5
    http://localhost:8888/fast?id=6
    http://localhost:8888/fast?id=7
    http://localhost:8888/fast?id=8
    http://localhost:8888/fast?id=9
    http://localhost:8888/fast?id=10
    http://localhost:8888/fast?id=11
    http://localhost:8888/fast?id=12
    http://localhost:8888/fast?id=13
    http://localhost:8888/fast?id=14
    http://localhost:8888/fast?id=15
    http://localhost:8888/fast?id=16
    http://localhost:8888/fast?id=17
    http://localhost:8888/fast?id=18
    http://localhost:8888/fast?id=19
    http://localhost:8888/fast?id=20
    
    Start sending requests....
    slow result 1 <--- 1.338 s
    fast result 1
    fast result 2
    fast result 3
    fast result 4
    fast result 5
    fast result 6
    fast result 7
    slow result 2 <--- 1.169 s
    slow result 3 <--- 1.130 s
    fast result 8
    fast result 9
    fast result 10
    fast result 11
    fast result 13
    fast result 12
    fast result 14
    fast result 15
    fast result 16
    fast result 18
    fast result 17
    fast result 19
    fast result 20
    

    通过运行Python call_multi.py slow_threaded(所需的行为):

    Scheduling Get Requests:
    ------------------------
    http://localhost:8888/slow_threaded?id=1
    http://localhost:8888/slow_threaded?id=2
    http://localhost:8888/slow_threaded?id=3
    http://localhost:8888/fast?id=1
    http://localhost:8888/fast?id=2
    http://localhost:8888/fast?id=3
    http://localhost:8888/fast?id=4
    http://localhost:8888/fast?id=5
    http://localhost:8888/fast?id=6
    http://localhost:8888/fast?id=7
    http://localhost:8888/fast?id=8
    http://localhost:8888/fast?id=9
    http://localhost:8888/fast?id=10
    http://localhost:8888/fast?id=11
    http://localhost:8888/fast?id=12
    http://localhost:8888/fast?id=13
    http://localhost:8888/fast?id=14
    http://localhost:8888/fast?id=15
    http://localhost:8888/fast?id=16
    http://localhost:8888/fast?id=17
    http://localhost:8888/fast?id=18
    http://localhost:8888/fast?id=19
    http://localhost:8888/fast?id=20
    
    Start sending requests....
    fast result 1
    fast result 2
    fast result 3
    fast result 4
    fast result 5
    fast result 6
    fast result 7
    fast result 8
    fast result 9
    fast result 10
    fast result 11
    fast result 12
    fast result 13
    fast result 14
    fast result 15
    fast result 19
    fast result 20
    fast result 17
    fast result 16
    fast result 18
    slow result 2 <--- 2.485 s
    slow result 3 <--- 2.491 s
    slow result 1 <--- 2.517 s
    
  • 共有1个答案

    吕宇定
    2023-03-14

    如果您愿意使用concurrent.futures.processPoolExecutor而不是multiprocessing,这实际上非常简单。Tornado的ioloop已经支持concurrent.futures.future,所以它们可以开箱即用。concurrent.futures包含在Python 3.2+中,并已被回移植到Python 2.x中。

    这里有一个例子:

    import time
    from concurrent.futures import ProcessPoolExecutor
    from tornado.ioloop import IOLoop
    from tornado import gen
    
    def f(a, b, c, blah=None):
        print "got %s %s %s and %s" % (a, b, c, blah)
        time.sleep(5)
        return "hey there"
    
    @gen.coroutine
    def test_it():
        pool = ProcessPoolExecutor(max_workers=1)
        fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future
        print("running it asynchronously")
        ret = yield fut
        print("it returned %s" % ret)
        pool.shutdown()
    
    IOLoop.instance().run_sync(test_it)
    

    产出:

    running it asynchronously
    got 1 2 3 and ok
    it returned hey there
    
     类似资料:
    • 我是web服务新手,正在阅读Martin Kalin的《Java Webservices》一书。我已经了解了它最初的基本概念,有一个问题: 假设将HTTP请求(包含SOAP消息信封)发送到JavaWeb服务()。该请求是否由Servlet内部处理,Servlet提取SOAP消息并将其转换为相应Java域对象,然后调用服务实现bean? 无论Metro和Axis等现成框架如何,这个问题都是通用的。只

    • 问题内容: 所以我用Go编写了这个RESTful后端,它将通过跨站点HTTP请求来调用,即从另一个站点(实际上,只是另一个端口,但是同源策略启动了,所以我们来了)提供的内容中调用。 在这种情况下,在某些情况下,用户代理将发送预检OPTIONS请求,以检查实际请求是否可以安全发送。 我的问题是如何在Go上下文中最好地处理和充分响应这些预检请求。我构想的方法感觉不太优雅,我想知道是否还有其他我没有想到

    • 问题内容: 在我的应用程序中,我有一些异步Web服务。服务器接受请求,返回OK响应,并开始使用AsyncTaskExecutor处理请求。我的问题是如何在此处启用请求范围,因为在此处理中,我需要获取由以下内容注释的类: 现在我得到异常: 因为它在SimpleAsyncTaskExecutor而不是在DispatcherServlet 我的请求异步处理 taskExecutor在哪里: 问题答案:

    • 问题内容: 我已经写了一个bash脚本foo.sh 我想在我的远程服务器上执行它。我尝试了,它起作用了。 之后,我像这样更改了test.sh文件 现在,我想传递一个本地参数以与我的脚本一起执行,但是当我键入它时会返回一个错误。 如何通过脚本传递参数? 问题答案: 使用该选项,该选项强制(或任何POSIX兼容外壳程序)从标准输入而不是从第一个位置参数命名的文件中读取其命令。而是将所有参数都视为脚本的

    • 在我的应用程序中,我有一些异步web服务。服务器接受请求,返回OK响应,并使用AsyncTaskExecutor开始处理请求。我的问题是如何在此处启用请求范围,因为在此处理中,我需要获取由以下注释的类: 现在我得到异常: 因为它在SimpleAsynctaskeExecutor中运行,而不是在DispatcherServlet中运行 我的异步处理请求 其中,taskExecutor是:

    • 我正在学习Executor服务,并试图了解如何与线程池中的线程共享数据列表。我的runnable方法需要从列表中读取数据并进行处理。 池中的所有线程应该只处理一次列表中的不同元素