Python3基础--20--Docker与python的碰撞结合

冯野
2023-12-01
# ---------------------------------------------------------
# 主扫描任务
# ---------------------------------------------------------

@dockerSec.task(bind=True)
def scan_task(self, ImageName):
    pull = "docker pull %s" % ImageName
    os.system(pull)

    analyze = "anchore analyze --image %s --imagetype base" % ImageName
    os.system(analyze)
    cve_scan = "anchore --json query --image %s cve-scan all" % ImageName
    cve = os.popen(cve_scan).readlines()
    # 删除不需要的键值,请直接删除相关源代码
    return cve


parser = reqparse.RequestParser()
parser.add_argument('imagename', type=unicode, required=True, help='imagename', location='json')


class ScanTask(Resource):
    def get(self):
        pass

    def post(self):
        args = parser.parse_args()
        ImageName = args['imagename']

        try:
            scan_start_time = time.strftime("%Y-%m-%d %H:%M:%S")
            group([scan_task.s(ImageName)])
            jobA = scan_task.apply_async(args=[ImageName])
            print jobA.id
            while True:
                status = scan_task.AsyncResult(jobA.id)
                if status.state == 'SUCCESS':
                    scan_end_time = time.strftime("%Y-%m-%d %H:%M:%S")
                    result = status.result
                    break
                time.sleep(3)
        except:
            abort(500, message="Image {} scan failed".format(ImageName))
        return scan_start_time, '', scan_end_time, '', result


# ---------------------------------------------------------
# 停止扫描任务(完成)
# ---------------------------------------------------------
@dockerSec.task
def stop_task(task_id):
    current_app.control.revoke(task_id, terminate=True, signal='SIGKILL')
    #from celery import app
    #app.control.revoke(task_id, terminate=True, signal='SIGKILL')


class StopTask(Resource):
    def get(self, task_id):
        try:
            group([stop_task.s(task_id)])
            stop_task.apply_async(args=[task_id])
        except:
            abort(500, message="Task Stop Failed")

        return {'status': 'Task Stop Success'}, 200


if __name__ == '__main__':
    '''
    api.add_resource(AllScanList, '/allscanlist')
    api.add_resource(SingleScanTask, '/singlescantask/<string:task_id>')
    '''
    api.add_resource(ScanTask, '/scantask')                   # post方法
    api.add_resource(StopTask, '/stoptask/<string:task_id>')  # get方法
    manager = Manager(app)
    manager.run()

二、Docker漏洞扫描引擎

readme.md

==============================================================
自动安装:
1、执行install.sh,自动安装所需环境(一般只执行一次即可)
注:环境安装正常情况下非常耗时
2、执行start_up.sh,启动服务(每次启动进程都要执行)
3、访问接口即可
==============================================================
手动安装:
安装运行所需的python库:
pip2 install flask
pip2 install flask_script
pip2 install flask_restful
pip2 install celery
安装Docker:
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
#shell脚本中默认是aliyun的,如需添加官方的为这个:https://download.docker.com/linux/centos/docker-ce.repo
sudo yum makecache fast
sudo yum -y install docker
安装Anchore:
sudo yum install epel-release
sudo yum install rpm-python
pip2 install docker-compose
sudo yum install dpkg
pip2 install anchore
\导入CVE指纹库:
export PATH=~/.local/bin:$PATH
anchore feeds list
anchore feeds sync
安装redis:
yum install redis
==============================================================
引擎功能:
        1、获取Docker中的镜像列表,并展示给前端页面,前端页面选择要扫描哪一个镜像
        2、拉取要扫描的镜像(可以并发)
        3、扫描镜像并生成报告到数据库,扫描过程中可以停止任务(不是暂停,也不能删除)(可以并发)
        4、从数据库中读取扫描报告
==============================================================
模块说明:
        ScanTask.py :
                扫描并在本地生成.txt格式的报告
        Report.py:
                调用ScanTask.py完成扫描,并读取本地.txt格式报告到数据库,随后删除本地报告
        BaseTask.py :
                获取镜像列表并展示给前端,让前端做选择
                拉取要选择的镜像
                停止扫描任务
        ScanAPI.py:
                API封装。所有API都在这里
        database.py:
                读取配置文件及连接数据库
        model.py:
                将数据写入数据库
        config:
                配置文件
==============================================================
接口说明:
        1、获取镜像列表并展示 :/getscanlist
            请求URL如:http://127.0.0.1:5000/getscanlist
            获取docker镜像库中的镜像列表
            获取镜像列表成功返回:镜像列表
            获取镜像列表失败返回:"获取镜像列表失败",状态码:200
 
        2、之后前端做选择,选择结果形成get请求形式:/image?image=所选择的镜像名
             请求URL如:http://127.0.0.1:5000/image?image=nginx
             表明前端选择的镜像是nginx
             如果该过程请求正常:nginx将自动赋值给变量ImageName
             如果该过程爆出异常返回:"选择镜像出现异常",状态码:200
        3、拉取前端选择的镜像:/pullimage
            请求URL如:http://127.0.0.1:5000/pullimage
            从镜像库中拉取刚才选择的镜像。本例中为nginx
            该过程正常返回:"镜像拉取成功",状态码:200
            该过程异常返回:"镜像拉取失败",状态码:200
        4、停止扫描任务:/stoptask
            请求URL如:http://127.0.0.1:5000/stoptask
            停止扫描任务,本例中为停止nginx的扫描任务
            如果扫描任务停止正常返回: "扫描任务已停止",状态码:200
            如果扫描任务停止异常返回: "扫描任务停止出现异常",状态码:200
        5、开始扫描并自动写入报告到数据库:/scantask
            请求URL如:http://127.0.0.1:5000/scantask
            开始扫描并自动写入报告到数据库。本例中为扫描nginx镜像并将扫描报告写入数据库
            扫描正常结束返回:"扫描完成,请查看报告",状态码:200
            扫描出现异常返回:"扫描失败,请查看原因",状态码:200
 
        6、从数据库中读取报告:/report/所选择的镜像名 
            请求URL如:http://127.0.0.1:5000/report/nginx
            从数据库中提取所有同一镜像名的报告。本例中为读取所有镜像名为nginx的报告。
            因为扫描任务名为所选择的镜像名,当出现一模一样的镜像名扫描了两次时,就会同时读取本次扫描报告和历史扫描报告。
            读取正常返回:扫描报告内容
            读取异常返回:404,message="Image nginx dosn't scan"      //其中nginx是动态的

install.sh

#!/bin/bash
echo "安装运行所需的python库"
pip2 install flask
pip2 install flask_script
pip2 install flask_restful
pip2 install celery

echo "安装Docker"
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
sudo yum makecache fast
sudo yum -y install docker

echo "安装Anchore"
sudo yum install epel-release
sudo yum install rpm-python
pip2 install docker-compose
sudo yum install dpkg
pip2 install anchore

echo "导入CVE指纹库"
export PATH=~/.local/bin:$PATH
anchore feeds list
anchore feeds sync

echo "安装redis"
yum install redis

start_up.sh

#!/bin/bash
echo "启动Anchore服务"
export PATH=~/.local/bin:$PATH

echo "启动docker服务"
sudo systemctl start docker

echo "启动redis服务"
redis-server

echo "启动多进程并行"
celery -A ScanAPI.dockerSec worker -l INFO

echo "启动API服务"
python ScanAPI.py runserver

database.py

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

import ConfigParser
import os

config = ConfigParser.ConfigParser()
config.read("%s/config" % os.path.dirname(os.path.abspath(__file__)))

engine = create_engine(config.get('database', 'url'), convert_unicode=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))

Base = declarative_base()
Base.query = db_session.query_property()

def init_db():
    import model
    Base.metadata.drop_all(bind=engine)
    Base.metadata.create_all(bind=engine)

module.py

#! /usr/bin/env python
#-*- coding:utf-8 -*-

from database import Base
from sqlalchemy import Column, Integer, String, Text
from flask_restful import fields

class DockerScan(Base):
    resource_fields = {
        'id': fields.Integer,
        'scan_name': fields.String,
        'scan_time': fields.String,
        'scan_result': fields.String,
    }
    
    __tablename__ = 'DockerScan'
    id = Column(Integer, primary_key=True)
    scan_name = Column(String(50))
    scan_time = Column(String(50))
    scan_result = Column(Text())
    
    def __init__(self, scan_name="", scan_time="", scan_result=""):
        self.scan_name = scan_name
        self.scan_time = scan_time
        self.scan_result = scan_result

BaseTask.py

#!/usr/bin/python
#-*- coding:utf-8 -*-

import os
import sys
from flask import jsonify
from itertools import islice

reload(sys)
sys.setdefaultencoding('utf-8')

# ----------------------------------------------------------
# 获取镜像列表
# ----------------------------------------------------------
def ImageTask():
    lis = ''
    DockerList = """docker images | awk '{print $1,":",$3}'"""
    list = os.popen(DockerList).readlines()
    
    # ------------------------------------------------------
    # 展示镜像列表(去掉标题和空行/空格)
    # ------------------------------------------------------
    for line in islice(list,1,None):
        data = line.strip()
        if len(data) != 0:
            l = data.replace(' ','')    # 展示举例:nginx
            lis = lis + l + ' '
    list_image = lis.split()
    return jsonify(list_image)
    
# ----------------------------------------------------------
# 拉取镜像
# ----------------------------------------------------------

def PullImage(DockerImage):
    DockerPull = "docker pull %s" % DockerImage
    os.system(DockerPull)
    
# ----------------------------------------------------------
# 停止进程。写入了一半的文件不做删除
# ----------------------------------------------------------
def StopTask():
    try:
        sys.exit()    # 停止进程。默认不停止,须手动触发
    except SystemExit, e:
        return 'ScanTask be Stoped'
        
if '__name__' == '__main__':
    from ScanAPI import Image    #循环导入,保证要在使用时再导入
    ImageTask()
    image = Image()
    PullImage(image.get())
    StopTask()

ScanTask.py

#!/usr/bin/python
#-*- coding:utf-8 -*-

import os
import sys
from BaseTask import *
from itertools import islice

reload(sys)
sys.setdefaultencoding('utf-8')

# ----------------------------------------------------------
# 报告处理。如果文件名称重复,则自动在后面添加“_数字”类推
# ----------------------------------------------------------
class Output(object):
    # 控制台内容生成txt报告
    def __init__(self, check_filename = "default.log"):
        self.terminal = sys.stdout
        self.log = open(check_filename, "w")
    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)
    def flush(self):         # 即时更新
        pass
'''

# 判断文件名是否存在
def check_filename(filename):
    n = [2]
    def check_meta(file_name):
        file_name_new = file_name
        if os.path.isfile(file_name):
            file_name_new = file_name[:file_name.rfind('.')]+'_'+str(n[0])+file_name[file_name.rfind('.'):]
            n[0] += 1
        if os.path.isfile(file_name_new):
            file_name_new = check_meta(file_name)
        return file_name_new
    return_name = check_meta(filename)
    print return_name
    return return_name
'''

# ----------------------------------------------------------
# 镜像扫描
# ----------------------------------------------------------
def ScanTask(DockerImage):
    # name = check_filename('%s.txt'% DockerImage)
    sys.stdout = Output('%s.txt'% DockerImage)
    # print "本次扫描的镜像是: %s" % DockerImage
    
    # CVE漏洞扫描
    cve_scan = "anchore query --image %s cve-scan all" % DockerImage
    cve = os.popen(cve_scan).readlines()
    print "CVE漏洞扫描结果:\n------------------------------------------"
    for line in islice(cve,0,None):
        cve_data = line.strip()
        if len(cve_data) != 0:
            print cve_data
            
    # 镜像常规分析
    analysis = "anchore analyze --image %s --imagetype base" % DockerImage
    os.popen(analysis).readlines()
    analysis_scan = "anchore gate --image %s" % DockerImage
    analysis_result = os.popen(analysis_scan).readlines()
    print "\n镜像常规分析结果:\n------------------------------------------"
    for line in islice(analysis_result,0,None):
        analysis_result_data = line.strip()
        if len(analysis_result_data) != 0:
            print analysis_result_data
            
    # 扫描与纯净镜像的区别
    pure_scan = "anchore query --image %s show-file-diffs base" % DockerImage
    pure = os.popen(pure_scan).readlines()
    print "\n提取与纯净镜像的对比区别:\n------------------------------------------"
    for line in islice(pure,0,None):
        pure_data = line.strip()
        if len(pure_data) != 0:
            print pure_data
            
    # 镜像特征提取
    feature_scan = "anchore toolbox --image %s show" % DockerImage
    feature = os.popen(feature_scan).readlines()
    print "\n镜像特征提取结果:\n------------------------------------------"
    for line in islice(feature,0,None):
        feature_data = line.strip()
        if len(feature_data) != 0:
            print feature_data
            
if __name__ == "__main__":
    from ScanAPI import Image
    image = Image()
    ScanTask(image.get())

Report.py

#!/usr/bin/python
#-*- coding:utf-8 -*-

import os
import sys
import time
from database import db_session
from flask_restful import marshal_with
from model import DockerScan

reload(sys)
sys.setdefaultencoding('utf-8')

# ----------------------------------------------------------
# 报告处理。
# ----------------------------------------------------------
@marshal_with(DockerScan.resource_fields)
def Scan_Report():
    Report_List = "python ScanTask.py"
    os.system(Report_List)
    
    scan_time = time.strftime("%Y-%m-%d %H:%M:%S")     # scan_time是写入数据库之一
    
    from ScanAPI import Image
    image = Image()
    scan_name = image.get()
    
    file_a = open('%s.txt' % scan_name, 'r')      # 支持所有换行符
    
    lis = ''
    for line in file_a:          # 遍历文件,构造scan_result
        lis = lis + line + '\t'
    list1 = lis.split('\t')      # 指定分隔符切片
    scan_result = "".join(list1)  # 转换为字符串
    
    args = {
        'scan_name': scan_name,
        'scan_time': scan_time,
        'scan_result': scan_result
    }
    dockerscan = DockerScan(**args)
    
    try:
        db_session.add(dockerscan)
        db_session.commit()
    except Exception,e:
        print e
        
    os.remove('%s.txt'% scan_name)
    
if __name__ == "__main__":
    Scan_Report()

ScanAPI.py
主引擎-API封装功能

#!/usr/bin/python
#-*- coding:utf-8 -*-

from BaseTask import StopTask
from Report import Scan_Report

import sys
from celery import Celery
from celery import group
from flask_script import Manager
from flask import Flask, jsonify, request
from flask_restful import Resource, Api, abort
from flask_restful import marshal_with

from model import DockerScan
from database import db_session
from sqlalchemy import and_

reload(sys)
sys.setdefaultencoding('utf-8')

app = Flask(__name__)
api = Api(app)

er = 'redis://127.0.0.1:6379/5'
end = 'redis://127.0.0.1:6379/6'
dockerSec = Celery('ScanAPI', broker=er, backend=end)

# 并行任务设定
@dockerSec.task
def pull_image():
    from BaseTask import PullImage
    image = Image()
    
    try:
        PullImage(image.get())       # 传入参数,拉取镜像
    except:
        return {'status': '拉取镜像失败'}, 200
    else:
        return {'status': '拉取镜像成功'}, 200
        
@dockerSec.task
def scan_task():
    try:
        Scan_Report()      # 扫描并生成报告
    except:
        return {'status': '扫描失败,请查看原因'}, 200
    else:
        return {'status': '扫描完成,请查看报告'}, 200
        
@dockerSec.task
def stop_task():
    try:
        StopTask()
    except:
        return {'status': '扫描任务停止出现异常'}, 200
    else:
        return {'status': '扫描任务已停止'}, 200      # 停止任务
        
# 获取镜像列表(完成)
class GetScanList(Resource):    # 无需并行
    def get(self):
        from BaseTask import ImageTask
        
        try:
            return ImageTask()
        except:
            return {'status': '获取镜像列表失败'}, 200
            
# 前端选择
class Image(Resource):       #选择镜像
    def get(self):
        try:
            ImageName = request.args.get('image')   # "nginx"
        except:
            return {'status': '选择镜像出现异常'}, 200
        else:
            return ImageName
            
# 拉取选定的镜像(完成)
class pullImage(Resource):
    def get(self):
        jobB = group([pull_image.s()])      # group为一组,该组任务并发执行
        jobB.apply_async()                  # apply_async是进程池
        return jobB
        
# 停止扫描任务(完成)
class stopTask(Resource):
    def get(self):
        jobC = group([stop_task.s()])      # group为一组,该组任务并发执行
        jobC.apply_async()
        return jobC
        
# 镜像扫描开始(完成)
class scanTask(Resource):
    def get(self):
        jobD = group([scan_task.s()])      # 扫描,并同时将报告写入数据库
        jobD.apply_async()
        return jobD
        
# 报告(完成)
class report(Resource):
    @marshal_with(DockerScan.resource_fields)
    def get(self, ScanName):
        scanName = db_session.query(DockerScan).filter(and_(DockerScan.scan_name == ScanName)).all()
        if scanName:
            return scanName
        else:
            abort(404, message="Image {} doesn't scan".format(ScanName))
            
# 主页。不用每次都看代码再输入地址测试(完成)
class index(Resource):
    def get(self):
        list_a = """该页面为主页。请键入具体接口地址:
-----------------
        /getscanlist                 //展示镜像列表
        /image?image=选定镜像名      //选择了要扫描的镜像
        /pullimage                 //拉取镜像
        /stoptask                 //停止扫描
        /scantask                //开始扫描并生成报告
        /report/所选择的镜像名    //查看扫描报告
        """
        
        list_b = list_a.split('\n')
        return jsonify(list_b)
        
if __name__ == '__main__':
    # app.run(debug=True)
    api.add_resource(GetScanList, '/getscanlist')
    api.add_resource(pullImage, '/pullimage')
    api.add_resource(stopTask, '/stoptask')
    api.add_resource(scanTask, '/scantask')
    api.add_resource(report, '/report/<string:ScanName>')
    api.add_resource(Image, '/image')   # 浏览器输入的地址是要和类中的操作做拼接的,所以比这个要复杂
    api.add_resource(index, '/')
    manager = Manager(app)
    manager.run() 

config

[database]
url: mysql+pymysql://root:root123456@localhost:3306/DockerScan?charset=utf8

[celery]
broker_url: redis://127.0.0.1:6379
 类似资料: