在WEB业务上线前,QA测试阶段,可将QA的浏览器代理设到一个指定的代理中或测试pc拨入特定的vpn中,QA在测试功能的同时, 安全测试也会在后台同步完成,其好处不言而喻。
该类扫描器常见的有2种:
Note
本文只讲第1种,第2种的实现方式稍麻烦一些,一天半天的时间内写不出来,留在下篇文章中写。
proxy模块是在开源项目 https://github.com/senko/tornado-proxy
的基础上改的,将用户的请求与服务器的响应数据过滤后存入了mongodb中。 我新加的代码在30 - 38行之间。
class ProxyHandler(tornado.web.RequestHandler): SUPPORTED_METHODS = ['GET', 'POST', 'CONNECT'] @tornado.web.asynchronous def get(self): url_info = dict( method=self.request.method, url=self.request.uri ) self.request_info = None def handle_response(response): if (response.error and not isinstance(response.error, tornado.httpclient.HTTPError)): self.set_status(500) self.write('Internal server error:\n' + str(response.error)) else: self.set_status(response.code) for header in ('Date', 'Cache-Control', 'Server','Content-Type', 'Location'): v = response.headers.get(header) if v: self.set_header(header, v) v = response.headers.get_list('Set-Cookie') if v: for i in v: self.add_header('Set-Cookie', i) if response.body: self.write(response.body) # Insert http request and response into mongodb if self.application.scan: url = url_info.get('url') url_filter = UrlFilter(url) if url_filter.filter(): http_info = HttpInfo(url_info, self.request_info, response) values = http_info.get_info() mongodb = Mongodb(db_info) mongodb.insert(values) self.finish() body = self.request.body self.request_info = self.request if not body: body = None try: fetch_request( self.request.uri, handle_response, method=self.request.method, body=body, headers=self.request.headers, follow_redirects=False, allow_nonstandard_methods=True) except tornado.httpclient.HTTPError as e: if hasattr(e, 'response') and e.response: handle_response(e.response) else: self.set_status(500) self.write('Internal server error:\n' + str(e)) self.finish()
Note
代码比较占篇幅,这里不贴了,请参考我的github: https://github.com/netxfly/passive_scan 。
proxy有2个参数:
Note
任务分发模块会定期检查mongodb中的待扫描列表,根据status字段判断是否有扫描任务,如果有扫描任务就分发给celery的worker执行。
# -*- coding: utf-8 -*- __author__ = 'Hartnett' import time from pprint import pprint import pymongo from bson.objectid import ObjectId from config import db_info from scan_tasks import scan class Scheduler(object): def __init__(self, interval=5): self.interval = interval self.db_info = db_info # connect to database self.client = pymongo.MongoClient(self.db_info.get('host'), self.db_info.get('port')) self.client.security_detect.authenticate( self.db_info.get('username'), self.db_info.get('password'), source='passive_scan' ) self.db = self.client["passive_scan"] self.collection = self.db['url_info'] def _get_task(self): task_id = None task_info = None tasks = self.collection.find({'status' : 0}).sort("_id", pymongo.ASCENDING).limit(1) for task in tasks: url = task.get('url') task_id = task.get('_id') domain = task.get('domain') method = task.get('request').get('method') request_data = task.get('request').get('request_data') user_agent = task.get('request').get('headers').get('User-Agent') cookies = task.get('request').get('headers').get('Cookie') task_info = dict( task_id=task_id, url=url, domain=domain, method=method, request_data=request_data, user_agent=user_agent, cookies=cookies ) print("task_id : %s, \ntask_info:") % task_id pprint(task_info) return task_id, task_info # set task checking now def _set_checking(self, task_id): self.collection.update({'_id': ObjectId(task_id)}, {"$set" : {'status' : 1}}) # set task checked def _set_checked(self, task_id): self.collection.update({'_id': ObjectId(task_id)}, {"$set" : {'status' : 2}}) # distribution task def distribution_task(self): task_id, task_info = self._get_task() print "get scan task done, sleep %s second." % self.interval if task_id is not None: self._set_checking(ObjectId(task_id)) url = task_info.get('url') domain = task_info.get('domain') method=task_info.get('method') request_data=task_info.get('request_data') user_agent = task_info.get('user_agent') cookies = task_info.get('cookies') scan.apply_async((task_id,url,domain,method,request_data,user_agent,cookies,)) self._set_checked(ObjectId(task_id)) def run(self): while True: self.distribution_task() time.sleep(self.interval) if __name__ == '__main__': scheduler = Scheduler() scheduler.run()
Note
任务扫描模块是利用celery实现分布式扫描的,可以将worker部署在多台服务器中,后端的扫描器大家根据实现情况加,比如wvs,arachni,wvs或自己写的扫描器 ,这篇文章的重点在于代理扫描,我图方便就用了 arachni 。
# -*- coding:utf8 -*- __author__ = 'hartnett' from celery import Celery from arachni import arachni_console from config import BACKEND_URL, BROKER_URL, db_info from helper import Reporter, PassiveReport, TaskStatus app = Celery('task', backend=BACKEND_URL, broker=BROKER_URL) # scanning url task # -------------------------------------------------------------------- @app.task def scan(task_id, task_url,domain,method,request_data,user_agent,cookies): if task_url: print "start to scan %s, task_id: %s" % (task_url, task_id) scanner = arachni_console.Arachni_Console(task_url, user_agent, cookies,page_limit=1) report = scanner.get_report() if report: reporter = Reporter(report) value = reporter.get_value() if value: # 如果存在漏洞则记录到数据库中 scan_report = PassiveReport(db_info, value) scan_report.report() task_status = TaskStatus(db_info) # 将状态设为已扫描 task_status.set_checked(task_id)
实现这个demo用了半天时间,写web后台还要处理前端展示,比较麻烦,所以没写,只讲下基于proxy的扫描器的实现思路
选了个课题:大概是开发一个web系统,系统功能是使用分布式爬虫(这里需要用Hadoop)去爬取主机信息,包括开放的端口、存在的漏洞有哪些,什么xss呀,sql注入什么的,再添加一些用户管理扫描出来的漏洞,并且将漏洞信息可视化出来,需要用Java实现,分布式爬虫可以使用nutch框架。 问题是:完全没思路,web系统开发出来没问题,但是怎么结合nutch框架去实现爬虫,又怎么结合上Hadoop,以及
扫描有漏洞的车辆 为了找到有漏洞的车辆,你只需要在IP地址21.0.0.0/8 和 25.0.0.0/8 上扫描Sprint设备的端口6667。任何有响应的设备就是有漏洞的Uconect系统(或一个IRC服务器)。为了确定这一点,你可以尝试Telnet登陆这台设备并查找错误字符串“Unknown command”。 图-扫描设置 如果你想的话,接下来你可以与D-Bus服务交互,从而执行上述的任何操
漏洞扫描器是一种能够自动在计算机、信息系统、网络及应用软件中寻找和发现安全弱点的程序。它通过网络对目标系统进行探测,向目标系统发生数据,并将反馈数据与自带的漏洞特征库进行匹配,进而列举目标系统上存在的安全漏洞。漏洞扫描是保证系统和网络安全必不可少的手段,面对互联网入侵,如果用户能够根据具体的应用环境,尽可能早的通过网络扫描来发现安全漏洞,并及时采取适当的处理措施进行修补,就可以有效地阻止入侵事件的
是否有一个推荐的库来修复与org.apache.commons.beanutils.populate(bean,ParamMap)相关的Bean操纵漏洞?我试图编写一些自定义方法来验证参数映射,但这并没有解决问题。 问候桑杰
预备条件: 理解 glibc malloc VM 配置:Fedora 20(x86) 什么是 Off-By-One 漏洞? 在这篇文章中提到过,将源字符串复制到目标缓冲区可能造成 Off-By-One 漏洞,当源字符串的长度等于目标缓冲区长度的时候。 当源字符串的长度等于目标缓冲区长度的时候,单个 NULL 字符会复制到目标缓冲区的上方。因此由于目标缓冲区位于堆上,单个 NULL 字节会覆盖下一个
虚拟机安装:Ubuntu 12.04(x86) 什么是off by one? 将源字符串复制到目标缓冲区可能会导致off by one 1、源字符串长度等于目标缓冲区长度。 当源字符串长度等于目标缓冲区长度时,单个字节将被复制到目标缓冲区上方。这里由于目标缓冲区位于堆栈中,所以单个NULL字节可以覆盖存储在堆栈中的调用者的EBP的最低有效位(LSB),这可能导致任意的代码执行。 一如既往的充分的定
大家好,我的程序员们。 我有一些代码,spring工具套件编辑器的反应也不一样,也许你们一些聪明人知道为什么。 有人能告诉我为什么这是不可能的吗: 但这是!? 第一行代码给了我一个警告“Resource leak:unassigned closeable value is never close”,正如您所看到的,我在try/catch中使用了一个finally,如果对象不为空,它应该总是关闭对象
本文向大家介绍PHP Web木马扫描器代码分享,包括了PHP Web木马扫描器代码分享的使用技巧和注意事项,需要的朋友参考一下 不废话了,直接贴代码了。 代码如下: 以上代码就是php web木马扫描器代码分享,本文附有注释,有不明白的欢迎给我留言,相信实现方法也不止以上一种,欢迎大家多多分享不同的实现方法。