# ---------------------------------------------------------
# 主扫描任务
# ---------------------------------------------------------
@dockerSec.task(bind=True)
def scan_task(self, ImageName):
pull = "docker pull %s" % ImageName
os.system(pull)
analyze = "anchore analyze --image %s --imagetype base" % ImageName
os.system(analyze)
cve_scan = "anchore --json query --image %s cve-scan all" % ImageName
cve = os.popen(cve_scan).readlines()
# 删除不需要的键值,请直接删除相关源代码
return cve
parser = reqparse.RequestParser()
parser.add_argument('imagename', type=unicode, required=True, help='imagename', location='json')
class ScanTask(Resource):
def get(self):
pass
def post(self):
args = parser.parse_args()
ImageName = args['imagename']
try:
scan_start_time = time.strftime("%Y-%m-%d %H:%M:%S")
group([scan_task.s(ImageName)])
jobA = scan_task.apply_async(args=[ImageName])
print jobA.id
while True:
status = scan_task.AsyncResult(jobA.id)
if status.state == 'SUCCESS':
scan_end_time = time.strftime("%Y-%m-%d %H:%M:%S")
result = status.result
break
time.sleep(3)
except:
abort(500, message="Image {} scan failed".format(ImageName))
return scan_start_time, '', scan_end_time, '', result
# ---------------------------------------------------------
# 停止扫描任务(完成)
# ---------------------------------------------------------
@dockerSec.task
def stop_task(task_id):
current_app.control.revoke(task_id, terminate=True, signal='SIGKILL')
#from celery import app
#app.control.revoke(task_id, terminate=True, signal='SIGKILL')
class StopTask(Resource):
def get(self, task_id):
try:
group([stop_task.s(task_id)])
stop_task.apply_async(args=[task_id])
except:
abort(500, message="Task Stop Failed")
return {'status': 'Task Stop Success'}, 200
if __name__ == '__main__':
'''
api.add_resource(AllScanList, '/allscanlist')
api.add_resource(SingleScanTask, '/singlescantask/<string:task_id>')
'''
api.add_resource(ScanTask, '/scantask') # post方法
api.add_resource(StopTask, '/stoptask/<string:task_id>') # get方法
manager = Manager(app)
manager.run()
readme.md
==============================================================
自动安装:
1、执行install.sh,自动安装所需环境(一般只执行一次即可)
注:环境安装正常情况下非常耗时
2、执行start_up.sh,启动服务(每次启动进程都要执行)
3、访问接口即可
==============================================================
手动安装:
安装运行所需的python库:
pip2 install flask
pip2 install flask_script
pip2 install flask_restful
pip2 install celery
安装Docker:
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
#shell脚本中默认是aliyun的,如需添加官方的为这个:https://download.docker.com/linux/centos/docker-ce.repo
sudo yum makecache fast
sudo yum -y install docker
安装Anchore:
sudo yum install epel-release
sudo yum install rpm-python
pip2 install docker-compose
sudo yum install dpkg
pip2 install anchore
\导入CVE指纹库:
export PATH=~/.local/bin:$PATH
anchore feeds list
anchore feeds sync
安装redis:
yum install redis
==============================================================
引擎功能:
1、获取Docker中的镜像列表,并展示给前端页面,前端页面选择要扫描哪一个镜像
2、拉取要扫描的镜像(可以并发)
3、扫描镜像并生成报告到数据库,扫描过程中可以停止任务(不是暂停,也不能删除)(可以并发)
4、从数据库中读取扫描报告
==============================================================
模块说明:
ScanTask.py :
扫描并在本地生成.txt格式的报告
Report.py:
调用ScanTask.py完成扫描,并读取本地.txt格式报告到数据库,随后删除本地报告
BaseTask.py :
获取镜像列表并展示给前端,让前端做选择
拉取要选择的镜像
停止扫描任务
ScanAPI.py:
API封装。所有API都在这里
database.py:
读取配置文件及连接数据库
model.py:
将数据写入数据库
config:
配置文件
==============================================================
接口说明:
1、获取镜像列表并展示 :/getscanlist
请求URL如:http://127.0.0.1:5000/getscanlist
获取docker镜像库中的镜像列表
获取镜像列表成功返回:镜像列表
获取镜像列表失败返回:"获取镜像列表失败",状态码:200
2、之后前端做选择,选择结果形成get请求形式:/image?image=所选择的镜像名
请求URL如:http://127.0.0.1:5000/image?image=nginx
表明前端选择的镜像是nginx
如果该过程请求正常:nginx将自动赋值给变量ImageName
如果该过程爆出异常返回:"选择镜像出现异常",状态码:200
3、拉取前端选择的镜像:/pullimage
请求URL如:http://127.0.0.1:5000/pullimage
从镜像库中拉取刚才选择的镜像。本例中为nginx
该过程正常返回:"镜像拉取成功",状态码:200
该过程异常返回:"镜像拉取失败",状态码:200
4、停止扫描任务:/stoptask
请求URL如:http://127.0.0.1:5000/stoptask
停止扫描任务,本例中为停止nginx的扫描任务
如果扫描任务停止正常返回: "扫描任务已停止",状态码:200
如果扫描任务停止异常返回: "扫描任务停止出现异常",状态码:200
5、开始扫描并自动写入报告到数据库:/scantask
请求URL如:http://127.0.0.1:5000/scantask
开始扫描并自动写入报告到数据库。本例中为扫描nginx镜像并将扫描报告写入数据库
扫描正常结束返回:"扫描完成,请查看报告",状态码:200
扫描出现异常返回:"扫描失败,请查看原因",状态码:200
6、从数据库中读取报告:/report/所选择的镜像名
请求URL如:http://127.0.0.1:5000/report/nginx
从数据库中提取所有同一镜像名的报告。本例中为读取所有镜像名为nginx的报告。
因为扫描任务名为所选择的镜像名,当出现一模一样的镜像名扫描了两次时,就会同时读取本次扫描报告和历史扫描报告。
读取正常返回:扫描报告内容
读取异常返回:404,message="Image nginx dosn't scan" //其中nginx是动态的
install.sh
#!/bin/bash
echo "安装运行所需的python库"
pip2 install flask
pip2 install flask_script
pip2 install flask_restful
pip2 install celery
echo "安装Docker"
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
sudo yum makecache fast
sudo yum -y install docker
echo "安装Anchore"
sudo yum install epel-release
sudo yum install rpm-python
pip2 install docker-compose
sudo yum install dpkg
pip2 install anchore
echo "导入CVE指纹库"
export PATH=~/.local/bin:$PATH
anchore feeds list
anchore feeds sync
echo "安装redis"
yum install redis
start_up.sh
#!/bin/bash
echo "启动Anchore服务"
export PATH=~/.local/bin:$PATH
echo "启动docker服务"
sudo systemctl start docker
echo "启动redis服务"
redis-server
echo "启动多进程并行"
celery -A ScanAPI.dockerSec worker -l INFO
echo "启动API服务"
python ScanAPI.py runserver
database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import ConfigParser
import os
config = ConfigParser.ConfigParser()
config.read("%s/config" % os.path.dirname(os.path.abspath(__file__)))
engine = create_engine(config.get('database', 'url'), convert_unicode=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
def init_db():
import model
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
module.py
#! /usr/bin/env python
#-*- coding:utf-8 -*-
from database import Base
from sqlalchemy import Column, Integer, String, Text
from flask_restful import fields
class DockerScan(Base):
resource_fields = {
'id': fields.Integer,
'scan_name': fields.String,
'scan_time': fields.String,
'scan_result': fields.String,
}
__tablename__ = 'DockerScan'
id = Column(Integer, primary_key=True)
scan_name = Column(String(50))
scan_time = Column(String(50))
scan_result = Column(Text())
def __init__(self, scan_name="", scan_time="", scan_result=""):
self.scan_name = scan_name
self.scan_time = scan_time
self.scan_result = scan_result
BaseTask.py
#!/usr/bin/python
#-*- coding:utf-8 -*-
import os
import sys
from flask import jsonify
from itertools import islice
reload(sys)
sys.setdefaultencoding('utf-8')
# ----------------------------------------------------------
# 获取镜像列表
# ----------------------------------------------------------
def ImageTask():
lis = ''
DockerList = """docker images | awk '{print $1,":",$3}'"""
list = os.popen(DockerList).readlines()
# ------------------------------------------------------
# 展示镜像列表(去掉标题和空行/空格)
# ------------------------------------------------------
for line in islice(list,1,None):
data = line.strip()
if len(data) != 0:
l = data.replace(' ','') # 展示举例:nginx
lis = lis + l + ' '
list_image = lis.split()
return jsonify(list_image)
# ----------------------------------------------------------
# 拉取镜像
# ----------------------------------------------------------
def PullImage(DockerImage):
DockerPull = "docker pull %s" % DockerImage
os.system(DockerPull)
# ----------------------------------------------------------
# 停止进程。写入了一半的文件不做删除
# ----------------------------------------------------------
def StopTask():
try:
sys.exit() # 停止进程。默认不停止,须手动触发
except SystemExit, e:
return 'ScanTask be Stoped'
if '__name__' == '__main__':
from ScanAPI import Image #循环导入,保证要在使用时再导入
ImageTask()
image = Image()
PullImage(image.get())
StopTask()
ScanTask.py
#!/usr/bin/python
#-*- coding:utf-8 -*-
import os
import sys
from BaseTask import *
from itertools import islice
reload(sys)
sys.setdefaultencoding('utf-8')
# ----------------------------------------------------------
# 报告处理。如果文件名称重复,则自动在后面添加“_数字”类推
# ----------------------------------------------------------
class Output(object):
# 控制台内容生成txt报告
def __init__(self, check_filename = "default.log"):
self.terminal = sys.stdout
self.log = open(check_filename, "w")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self): # 即时更新
pass
'''
# 判断文件名是否存在
def check_filename(filename):
n = [2]
def check_meta(file_name):
file_name_new = file_name
if os.path.isfile(file_name):
file_name_new = file_name[:file_name.rfind('.')]+'_'+str(n[0])+file_name[file_name.rfind('.'):]
n[0] += 1
if os.path.isfile(file_name_new):
file_name_new = check_meta(file_name)
return file_name_new
return_name = check_meta(filename)
print return_name
return return_name
'''
# ----------------------------------------------------------
# 镜像扫描
# ----------------------------------------------------------
def ScanTask(DockerImage):
# name = check_filename('%s.txt'% DockerImage)
sys.stdout = Output('%s.txt'% DockerImage)
# print "本次扫描的镜像是: %s" % DockerImage
# CVE漏洞扫描
cve_scan = "anchore query --image %s cve-scan all" % DockerImage
cve = os.popen(cve_scan).readlines()
print "CVE漏洞扫描结果:\n------------------------------------------"
for line in islice(cve,0,None):
cve_data = line.strip()
if len(cve_data) != 0:
print cve_data
# 镜像常规分析
analysis = "anchore analyze --image %s --imagetype base" % DockerImage
os.popen(analysis).readlines()
analysis_scan = "anchore gate --image %s" % DockerImage
analysis_result = os.popen(analysis_scan).readlines()
print "\n镜像常规分析结果:\n------------------------------------------"
for line in islice(analysis_result,0,None):
analysis_result_data = line.strip()
if len(analysis_result_data) != 0:
print analysis_result_data
# 扫描与纯净镜像的区别
pure_scan = "anchore query --image %s show-file-diffs base" % DockerImage
pure = os.popen(pure_scan).readlines()
print "\n提取与纯净镜像的对比区别:\n------------------------------------------"
for line in islice(pure,0,None):
pure_data = line.strip()
if len(pure_data) != 0:
print pure_data
# 镜像特征提取
feature_scan = "anchore toolbox --image %s show" % DockerImage
feature = os.popen(feature_scan).readlines()
print "\n镜像特征提取结果:\n------------------------------------------"
for line in islice(feature,0,None):
feature_data = line.strip()
if len(feature_data) != 0:
print feature_data
if __name__ == "__main__":
from ScanAPI import Image
image = Image()
ScanTask(image.get())
Report.py
#!/usr/bin/python
#-*- coding:utf-8 -*-
import os
import sys
import time
from database import db_session
from flask_restful import marshal_with
from model import DockerScan
reload(sys)
sys.setdefaultencoding('utf-8')
# ----------------------------------------------------------
# 报告处理。
# ----------------------------------------------------------
@marshal_with(DockerScan.resource_fields)
def Scan_Report():
Report_List = "python ScanTask.py"
os.system(Report_List)
scan_time = time.strftime("%Y-%m-%d %H:%M:%S") # scan_time是写入数据库之一
from ScanAPI import Image
image = Image()
scan_name = image.get()
file_a = open('%s.txt' % scan_name, 'r') # 支持所有换行符
lis = ''
for line in file_a: # 遍历文件,构造scan_result
lis = lis + line + '\t'
list1 = lis.split('\t') # 指定分隔符切片
scan_result = "".join(list1) # 转换为字符串
args = {
'scan_name': scan_name,
'scan_time': scan_time,
'scan_result': scan_result
}
dockerscan = DockerScan(**args)
try:
db_session.add(dockerscan)
db_session.commit()
except Exception,e:
print e
os.remove('%s.txt'% scan_name)
if __name__ == "__main__":
Scan_Report()
ScanAPI.py
主引擎-API封装功能
#!/usr/bin/python
#-*- coding:utf-8 -*-
from BaseTask import StopTask
from Report import Scan_Report
import sys
from celery import Celery
from celery import group
from flask_script import Manager
from flask import Flask, jsonify, request
from flask_restful import Resource, Api, abort
from flask_restful import marshal_with
from model import DockerScan
from database import db_session
from sqlalchemy import and_
reload(sys)
sys.setdefaultencoding('utf-8')
app = Flask(__name__)
api = Api(app)
er = 'redis://127.0.0.1:6379/5'
end = 'redis://127.0.0.1:6379/6'
dockerSec = Celery('ScanAPI', broker=er, backend=end)
# 并行任务设定
@dockerSec.task
def pull_image():
from BaseTask import PullImage
image = Image()
try:
PullImage(image.get()) # 传入参数,拉取镜像
except:
return {'status': '拉取镜像失败'}, 200
else:
return {'status': '拉取镜像成功'}, 200
@dockerSec.task
def scan_task():
try:
Scan_Report() # 扫描并生成报告
except:
return {'status': '扫描失败,请查看原因'}, 200
else:
return {'status': '扫描完成,请查看报告'}, 200
@dockerSec.task
def stop_task():
try:
StopTask()
except:
return {'status': '扫描任务停止出现异常'}, 200
else:
return {'status': '扫描任务已停止'}, 200 # 停止任务
# 获取镜像列表(完成)
class GetScanList(Resource): # 无需并行
def get(self):
from BaseTask import ImageTask
try:
return ImageTask()
except:
return {'status': '获取镜像列表失败'}, 200
# 前端选择
class Image(Resource): #选择镜像
def get(self):
try:
ImageName = request.args.get('image') # "nginx"
except:
return {'status': '选择镜像出现异常'}, 200
else:
return ImageName
# 拉取选定的镜像(完成)
class pullImage(Resource):
def get(self):
jobB = group([pull_image.s()]) # group为一组,该组任务并发执行
jobB.apply_async() # apply_async是进程池
return jobB
# 停止扫描任务(完成)
class stopTask(Resource):
def get(self):
jobC = group([stop_task.s()]) # group为一组,该组任务并发执行
jobC.apply_async()
return jobC
# 镜像扫描开始(完成)
class scanTask(Resource):
def get(self):
jobD = group([scan_task.s()]) # 扫描,并同时将报告写入数据库
jobD.apply_async()
return jobD
# 报告(完成)
class report(Resource):
@marshal_with(DockerScan.resource_fields)
def get(self, ScanName):
scanName = db_session.query(DockerScan).filter(and_(DockerScan.scan_name == ScanName)).all()
if scanName:
return scanName
else:
abort(404, message="Image {} doesn't scan".format(ScanName))
# 主页。不用每次都看代码再输入地址测试(完成)
class index(Resource):
def get(self):
list_a = """该页面为主页。请键入具体接口地址:
-----------------
/getscanlist //展示镜像列表
/image?image=选定镜像名 //选择了要扫描的镜像
/pullimage //拉取镜像
/stoptask //停止扫描
/scantask //开始扫描并生成报告
/report/所选择的镜像名 //查看扫描报告
"""
list_b = list_a.split('\n')
return jsonify(list_b)
if __name__ == '__main__':
# app.run(debug=True)
api.add_resource(GetScanList, '/getscanlist')
api.add_resource(pullImage, '/pullimage')
api.add_resource(stopTask, '/stoptask')
api.add_resource(scanTask, '/scantask')
api.add_resource(report, '/report/<string:ScanName>')
api.add_resource(Image, '/image') # 浏览器输入的地址是要和类中的操作做拼接的,所以比这个要复杂
api.add_resource(index, '/')
manager = Manager(app)
manager.run()
config
[database]
url: mysql+pymysql://root:root123456@localhost:3306/DockerScan?charset=utf8
[celery]
broker_url: redis://127.0.0.1:6379