我正在尝试构建一个类,该类使用多处理请求来并行发出多个请求。我遇到一个问题,它只是挂起,并给我一个神秘的错误消息,我不确定的方式。
下面是我的代码,它基本上只是使用一个带回调的池将结果放入列表。我有一个要求,我需要一个"硬超时"为每个URL,即,如果一个URL需要几秒钟以上的下载其内容,我只是想跳过它。所以我使用池超时,并对尝试的URL和返回的URL内容进行了比较,尝试但未返回的URL被认为失败了。这是我的代码:
import time
import json
import requests
import sys
from urlparse import parse_qs
from urlparse import urlparse
from urlparse import urlunparse
from urllib import urlencode
from multiprocessing import Process, Pool, Queue, current_process
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError
import traceback
from sets import Set
from massweb.pnk_net.pnk_request import pnk_request_raw
from massweb.targets.fuzzy_target import FuzzyTarget
from massweb.payloads.payload import Payload
class MassRequest(object):
def __init__(self, num_threads = 10, time_per_url = 10, request_timeout = 10, proxy_list = [{}]):
self.num_threads = num_threads
self.time_per_url = time_per_url
self.request_timeout = request_timeout
self.proxy_list = proxy_list
self.results = []
self.urls_finished = []
self.urls_attempted = []
self.targets_results = []
self.targets_finished = []
self.targets_attempted = []
def add_to_finished(self, x):
self.urls_finished.append(x[0])
self.results.append(x)
def add_to_finished_targets(self, x):
self.targets_finished.append(x[0])
self.targets_results.append(x)
def get_urls(self, urls):
timeout = float(self.time_per_url * len(urls))
pool = Pool(processes = self.num_threads)
proc_results = []
for url in urls:
self.urls_attempted.append(url)
proc_result = pool.apply_async(func = pnk_request_raw, args = (url, self.request_timeout, self.proxy_list), callback = self.add_to_finished)
proc_results.append(proc_result)
for pr in proc_results:
try:
pr.get(timeout = timeout)
except:
pool.terminate()
pool.join()
pool.terminate()
pool.join()
list_diff = Set(self.urls_attempted).difference(Set(self.urls_finished))
for url in list_diff:
sys.stderr.write("URL %s got timeout" % url)
self.results.append((url, "__PNK_GET_THREAD_TIMEOUT"))
if __name__ == "__main__":
f = open("out_urls_to_fuzz_1mil")
urls_to_request = []
for line in f:
url = line.strip()
urls_to_request.append(url)
mr = MassRequest()
mr.get_urls(urls_to_request)
下面是线程调用的函数:
def pnk_request_raw(url_or_target, req_timeout = 5, proxy_list = [{}]):
if proxy_list[0]:
proxy = get_random_proxy(proxy_list)
else:
proxy = {}
try:
if isinstance(url_or_target, str):
sys.stderr.write("Requesting: %s with proxy %s\n" % (str(url_or_target), str(proxy)))
r = requests.get(url_or_target, proxies = proxy, timeout = req_timeout)
return (url_or_target, r.text)
if isinstance(url_or_target, FuzzyTarget):
sys.stderr.write("Requesting: %s with proxy %s\n" % (str(url_or_target), str(proxy)))
r = requests.get(url_or_target.url, proxies = proxy, timeout = req_timeout)
return (url_or_target, r.text)
except:
#use this to mark failure on exception
traceback.print_exc()
#edit: this is the line that was breaking it all
sys.stderr.out("A request failed to URL %s\n" % url_or_target)
return (url_or_target, "__PNK_REQ_FAILED")
这似乎适用于较小的URL集,但以下是输出:
Requesting: http://www.sportspix.co.za/ with proxy {}
Requesting: http://www.sportspool.co.za/ with proxy {}
Requesting: http://www.sportspredict.co.za/ with proxy {}
Requesting: http://www.sportspro.co.za/ with proxy {}
Requesting: http://www.sportsrun.co.za/ with proxy {}
Requesting: http://www.sportsstuff.co.za/ with proxy {}
Requesting: http://sportsstuff.co.za/2011-rugby-world-cup with proxy {}
Requesting: http://www.sportstar.co.za/4-stroke-racing with proxy {}
Requesting: http://www.sportstats.co.za/ with proxy {}
Requesting: http://www.sportsteam.co.za/ with proxy {}
Requesting: http://www.sportstec.co.za/ with proxy {}
Requesting: http://www.sportstours.co.za/ with proxy {}
Requesting: http://www.sportstrader.co.za/ with proxy {}
Requesting: http://www.sportstravel.co.za/ with proxy {}
Requesting: http://www.sportsturf.co.za/ with proxy {}
Requesting: http://reimo.sportsvans.co.za/ with proxy {}
Requesting: http://www.sportsvans.co.za/4x4andmoreWindhoek.html with proxy {}
Handled exception:Traceback (most recent call last):
File "mass_request.py", line 87, in get_fuzzy_targets
pr.get(timeout = timeout)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
raise self._value
AttributeError: 'file' object has no attribute 'out'
在最后一个例外情况下,程序挂起,我必须完全杀死它。好吧,我从来没有尝试访问属性为“out”的文件对象。我的问题是。。。如何修复!?我是不是做错了什么?为什么没有更明确的例外?
我认为sys.stderr.out("请求失败到URL%s\n"%url_or_target)
应该是sys.stderr.write("请求失败到URL%s\n"%url_or_target)
问题内容: 我想转换火花数据框架以使用以下代码添加: 详细的错误消息是: 有人知道我在这里做错了吗?谢谢! 问题答案: 您无法使用数据框,但可以将数据框转换为RDD并通过映射将其映射。在Spark 2.0之前,别名为。使用Spark 2.0,您必须先明确调用。
问题内容: 我如何解决此错误,我是从GitHub下载此代码的。 引发错误 请帮我解决这个问题! 我用了: 我得到这个错误。有人帮我,我只想让它工作为什么这么难? 问题答案: 我怀疑您从中复制代码的地方启用了急切执行功能,即在程序开始时调用了该位置。 您也可以这样做。希望能有所帮助。 更新:请注意,默认情况下,TensorFlow 2.0中启用了急切执行。因此,以上答案仅适用于TensorFlow
问题内容: 下面的代码给出了错误: 码: 问题答案: 从代码中,我可以看到你希望允许用户下载pdf。 现在开始 去 http://localhost:5000
问题内容: 当我尝试时,会发生错误: 我找到了带有pyelasticsearch.py 的链接https://github.com/toastdriven/pyelasticsearch/blob/master/pyelasticsearch.py#L424-469,但我不知道它是哪个版本。无论如何,该代码中没有购买我的pyelasticsearch.py。任何人都有相同的经历吗?感谢您的
我和cloud composer一起策划了两个数据流工作,它已经工作了一个月了。突然,这两个作业停止工作,并出现以下错误消息: 在作业中,我用存储客户端从google cloud storage下载一个文件。我以为这是因为一些依赖问题。在composer环境中,我安装了google-cloud-storage,但没有指定版本。我尝试指定包的不同版本,但似乎没有任何工作。 谢了!
问题内容: 我正在使用Flask开发论坛模板。当我尝试使用表单在浏览器中创建新线程时,SQLAlchemy抛出AttributeError。当我尝试与论坛到线程实现一对多关系以及与线程到用户实现一对多关系时,出现了问题。 models.py 所有新的帖子/主题,并在views.py中处理 views.py 问题答案: 问题是这样的: 你要使用ORM对象,而不是主键列: 该错误表示整数被解释为ORM