以下代码均为适用于自己的项目,大概操作的部分基本都写了,不要全部复制粘贴,按需自取。
# !/usr/bin/env python
# -*- coding: utf-8 -*-
from keystoneauth1 import identity
from keystoneauth1 import session
from neutronclient.v2_0 import client
from xenadmin.settings import OpenStack_username, OpenStack_password, OpenStack_project_name, \
OpenStack_project_domain_id, OpenStack_user_domain_id, OpenStack_auth_url
import hashlib
from novaclient import client as noclient
import openstack
import traceback
import logging
class OpenStackMange(object):
def __init__(self, neutron=None, conn=None, secrity_groups_id=None):
# 获取 neutron 连接
self.sess = self.get_client()
self.neutron = neutron or self.get_neutron()
# 获取 conn 连接
self.conn = conn or self.get_conn()
self.nova_client = noclient.Client('2', session=self.sess)
# 获取安全组id
self.secrity_groups_id = secrity_groups_id or self.get_secrity_groups()
def __enter__(self):
return self
def __exit__(self, *args):
return self
def get_client(self):
username = OpenStack_username
password = OpenStack_password
project_name = OpenStack_project_name
project_domain_id = OpenStack_project_domain_id
user_domain_id = OpenStack_user_domain_id
auth_url = OpenStack_auth_url
auth = identity.Password(auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
project_domain_id=project_domain_id,
user_domain_id=user_domain_id)
sess = session.Session(auth=auth)
return sess
# 获取网段neutron句柄
def get_neutron(self):
neutron = client.Client(session=self.sess)
return neutron
# 获取sdk连接
def get_conn(self):
conn = openstack.connect(cloud='test_cloud')
return conn
# 获取安全组id
def get_secrity_groups(self):
secrity_groups = self.neutron.list_security_groups()
logging.error('安全组id ==: {}'.format(secrity_groups))
id = None
try:
id = secrity_groups['security_groups'][0]['id']
except Exception:
traceback.print_exc()
logging.error(id)
return [id]
# 网段及子网sdk
def network_list(self):
network_id, subnet_id = None, None
try:
# list all networks
networks = self.neutron.list_networks()
logging.error("networks: %s" % networks)
# create a network
filters = {}
filters["id"] = network_id
logging.error(network_id, subnet_id)
networks = self.neutron.list_networks(**filters)
logging.error("-----networks: %s" % networks)
except Exception:
traceback.print_exc()
# 网段及子网sdk
def network_api(self, network_name='mynetwork-test', network_ip='192.0.0.0/8', gateway_ip='192.0.0.2'):
network_id, subnet_id = None, None
try:
# list all networks
# networks = self.neutron.list_networks()
# logging.error("networks: %s" % networks)
# create a network
network = {'name': network_name, 'admin_state_up': True, 'region_name': 'RegionOne',
'ipam_vpc_prefix': network_ip}
new_network = self.neutron.create_network({'network': network})
logging.error("network: %s" % new_network)
net_dict = new_network['network']
network_id = net_dict['id']
logging.error("network_id: %s" % network_id)
# create a subnet
subnet_body = {
'subnets': [{'cidr': network_ip, 'ip_version': 4, 'network_id': network_id, 'gateway_ip': gateway_ip,
"ipam_site_name": "mg"}]}
new_subnet = self.neutron.create_subnet(subnet_body)
subnet_id = new_subnet['subnets'][0]['id']
logging.error("subnet: %s" % new_subnet)
except Exception:
traceback.print_exc()
return network_id, subnet_id
# 删除子网及网段
def delete_network(self, network_id=None):
logging.error("Delete Network:")
status = False
if not network_id:
return status
try:
# networks = self.neutron.list_networks()
# port list
port_list_del = []
network_port = {}
network_port["network_id"] = network_id
list_ports = self.neutron.list_ports(**network_port)
logging.error('------- list_ports : {}'.format(list_ports))
for port in list_ports.get('ports', []):
if not port.get('device_id', ''):
self.delete_port(port.get('id', ''))
else:
port_list_del.append(port.get('id', ''))
# 判断是否只有初始化的端口
if len(port_list_del) == 1:
if self.check_port_vm(port_list_del[0]):
self.delete_port(port_list_del[0])
port_list_del = []
if port_list_del:
return False
filters = {}
filters["id"] = network_id
networks = self.neutron.list_networks(**filters)
logging.error("-----networks: %s" % networks)
logging.error('*******************************')
# subnets = self.neutron.list_subnets()
# logging.error("subnets: %s" % subnets)
for network_item in networks['networks']:
if network_id == network_item.get('id', ''):
subnet_ids = network_item.get('subnets', [])
logging.error('*******************************')
for example_subnet in subnet_ids:
self.neutron.delete_subnet(example_subnet)
self.neutron.delete_network(network_item['id'])
networks = self.neutron.list_networks(**filters)
logging.error("+++++networks: %s" % networks)
logging.error('*******************************')
status = True
except Exception:
traceback.print_exc()
return status
# 获取端口
def get_list_ports(self, id=None):
logging.error("List Ports:")
if id:
filters = {}
filters["id"] = id
list_ports = self.neutron.list_ports(**filters)
logging.error('------- list_ports : {}'.format(list_ports))
else:
list_ports = self.conn.list_ports()
for port in list_ports:
logging.error(port)
return list_ports
# 创建端口(具体ip)
def create_port(self, name="Specified-test-IP", network_id=None, subnet_id=None, ip_address=None):
port_id = ''
try:
# alloc ip address by user Specified
port_body = {"port": {"name": name, "network_id": network_id, "security_groups": self.secrity_groups_id,
"fixed_ips": [{"subnet_id": subnet_id, "ip_address": ip_address}]}}
new_port = self.neutron.create_port(port_body)
port_id = new_port.get('port', {}).get('id', '')
mac_address = new_port.get('port', {}).get('mac_address', '')
fixed_ips = new_port.get('port', {}).get('fixed_ips', [])
ip_addr = ''
if fixed_ips:
ip_addr = fixed_ips[0].get('ip_address')
logging.error('id {} , ip :{} , mac_address : {}'.format(port_id, ip_addr, mac_address))
#if not mac_address:
# self.delete_port(port_id)
# port_id = ''
except Exception:
traceback.print_exc()
return mac_address, port_id
# 创建端口(随机ip)
def range_create_port(self, name="Specified-test-IP", network_id=None):
port_id = ''
ip_addr = ''
mac_address = ''
try:
port_body = {"port": {"name": name, "network_id": network_id}}
new_port = self.neutron.create_port(port_body)
port_id = new_port.get('port', {}).get('id', '')
mac_address = new_port.get('port', {}).get('mac_address', '')
fixed_ips = new_port.get('port', {}).get('fixed_ips', [])
ip_addr = ''
if fixed_ips:
ip_addr = fixed_ips[0].get('ip_address')
logging.error('id {} , ip :{} , mac_address : {}'.format(port_id, ip_addr, mac_address))
if not mac_address:
self.delete_port(port_id)
port_id = ''
ip_addr = ''
mac_address = ''
except Exception:
traceback.print_exc()
return port_id, ip_addr, mac_address
# 删除端口(具体ip)
def delete_port(self, id=None):
try:
if not id:
return
port_list = []
filters = {}
filters["id"] = id
new_port = self.neutron.list_ports()
logging.error('*****-port delete {}'.format(new_port))
new_port = self.neutron.list_ports(**filters)
logging.error('--port delete {}'.format(new_port))
if new_port:
port_list = new_port['ports']
for port_item in port_list:
self.neutron.delete_port(port_item['id'])
new_port = self.neutron.list_ports(**filters)
logging.error('++port delete {}'.format(new_port))
except Exception:
traceback.print_exc()
# 获取镜像列表
def list_image(self):
try:
logging.error("List Images:")
for image in self.conn.image.images():
logging.error(image)
except Exception:
traceback.print_exc()
# 根据镜像名称获取镜像id
def get_id_by_name(self, image_id_or_name=None):
image = None
try:
image = self.conn.image.find_image(image_id_or_name)
logging.error('image === {}'.format(list))
except Exception:
traceback.print_exc()
return image
# 上传镜像
def upload_image_from_file(self, name='analysis-test1', file_name='/root/analysis.qcow2',
container_format="bare", disk_format="qcow2"):
image_id = None
try:
logging.error("Upload Image:")
# Load fake image data for the example.
# data = '/root/zy/CentOS-7-x86_64-GenericCloud.qcow2'
# Build the image attributes and upload the image.
# image_attrs = {
# 'name': "test-upload-image-1",
# 'data': data,
# 'disk_format': 'qcow2',
# 'container_format': 'bare',
# 'visibility': 'public',
# }
# conn.image.upload_image(**image_attrs)
# name = 'hanyuyang-test1'
# file_name = '/root/analysis.qcow2'
image = self.conn.image.create_image(name=name, filename=file_name,
container_format=container_format,
disk_format=disk_format, wait=True, timeout=7200)
logging.error(image.id)
image_id = image.id
except Exception:
traceback.print_exc()
return image_id
# 下载镜像
def download_image_stream(self, image_id, file_name):
# download_image_stream("6d127c72-6a39-46b1-84a2-eb652b186ee1",
# "./6d127c72-6a39-46b1-84a2-eb652b186ee1.image.qcow2")
logging.error("Download Image via streaming:")
# Find the image you would like to download.
image = self.conn.image.find_image(image_id)
# As the actual download now takes place outside of the library
# and in your own code, you are now responsible for checking
# the integrity of the data. Create an MD5 has to be computed
# after all of the data has been consumed.
md5 = hashlib.md5()
with open(file_name, "wb") as local_image:
response = self.conn.image.download_image(image, stream=True)
# Read only 1024 bytes of memory at a time until
# all of the image data has been consumed.
for chunk in response.iter_content(chunk_size=1024):
# With each chunk, add it to the hash to be computed.
md5.update(chunk)
local_image.write(chunk)
# Now that you've consumed all of the data the response gave you,
# ensure that the checksums of what the server offered and
# what you downloaded are the same.
if response.headers["Content-MD5"] != md5.hexdigest():
raise Exception("Checksum mismatch in downloaded content")
# 此种下载方式不建议使用 ,OpenStack允许名称重复
def download_image(self, name="myimage"):
logging.error("Download Image:")
# Find the image you would like to download.
image = self.conn.image.find_image(name)
with open("{}.qcow2".format(name), "w") as local_image:
response = self.conn.image.download_image(image)
# Response will contain the entire contents of the Image.
local_image.write(response)
# 删除镜像
def delete_image(self, id='50e74765-784a-4b9d-873c-625bdf55f70b'):
status = False
logging.error("delete Image: {}".format(id))
try:
# Find the image you would like to download.
# example_image = self.conn.image.find_image(id)
example_image = self.get_id_by_name(id)
self.conn.image.delete_image(example_image, ignore_missing=False)
status = True
except Exception:
traceback.print_exc()
return status
# 查询套餐
def find_flavor(self, flavor_id=None):
try:
logging.error("Find Flavor:")
flavor = self.conn.compute.find_flavor(flavor_id)
logging.error(flavor)
return flavor
except Exception:
traceback.print_exc()
# 创建套餐
def create_flavor(self, name="test-flavor-2", ram=2048, vcpus=1, disk=10):
flavor_id = None
try:
logging.error("Create Flavor:")
flavor = self.conn.compute.create_flavor(name=name, ram=ram, vcpus=vcpus, disk=disk)
logging.error('套餐 {}'.format(flavor))
logging.error('套餐 {}'.format(flavor.id))
flavor_id = flavor.id
except Exception:
traceback.print_exc()
return flavor_id
# 删除套餐
def delete_flavor(self, id):
try:
logging.error("delete Flavor:")
flavor = self.find_flavor(id)
logging.error('delete before套餐 {}'.format(flavor))
self.conn.compute.delete_flavor(flavor)
flavor = self.find_flavor(id)
logging.error('delete after套餐 {}'.format(flavor))
status = True
except Exception:
traceback.print_exc()
status = False
return status
# 虚机状态
def kvm_status(self, instance_id):
server = self.conn.compute.get_server(instance_id)
return server.status
# 开启虚机
def start_kvm(self, instance_id):
server = self.conn.compute.start_server(instance_id)
return server
# 关闭虚机
def stop_kvm(self, instance_id):
server = self.conn.compute.stop_server(instance_id)
return server
# 创建虚机
def create_kvm(self, name, image_id, flavor_id, metadata, nics):
nova_client = self.nova_client
instance = nova_client.servers.create(name, image=image_id,
meta=metadata,
flavor=flavor_id,
nics=nics)
return instance.id
def delete_kvm(self, instance_id):
server = self.conn.compute.delete_server(instance_id)
return server
# 修改虚机名称
def update_kvm(self, instance_id, name):
nova_client = self.nova_client
try:
server = nova_client.servers.update(instance_id, name)
except Exception:
traceback.print_exc()
return server
# 获取虚拟机列表
def get_kvm_list(self, image_id=None):
try:
servers = self.nova_client.servers.list()
# image_id = "8c9bd0b7-5b3a-42a3-9b41-501dfdc4b837"
for k in servers:
logging.error(k.image.get('id'))
if k.image.get('id') == image_id:
return False
except Exception:
traceback.print_exc()
return False
return True
# 匹配虚拟机名称
def get_kvm_name(self, name='InternalManagerVM-'):
try:
servers = self.nova_client.servers.list()
# image_id = "8c9bd0b7-5b3a-42a3-9b41-501dfdc4b837"
for k in servers:
if name in k.name:
logging.error(k.__dict__)
return k.id
except Exception:
return ''
# 为默认虚拟机添加port
def set_managervm(self, network_id=None):
port_id, ip_addr, mac_address = '', '', ''
try:
import uuid
name = uuid.uuid1()
port_id, ip_addr, mac_address = self.range_create_port(name=name, network_id=network_id)
if not port_id:
return False, '', '', ''
server_id = self.get_kvm_name()
if not server_id:
return False, '', '', ''
new_server = self.nova_client.servers.get(server_id)
logging.error("server: %s => %s" % (new_server.id, new_server.status))
if new_server.status != "ACTIVE":
logging.error("server is not active: %s" % new_server)
res = self.nova_client.servers.interface_attach(server=server_id, port_id=port_id,
net_id=None, fixed_ip=None)
status = True
logging.error(res)
except Exception:
status = False
traceback.print_exc()
return status, port_id, ip_addr, mac_address
# 删除子网及网段及接口
def delete_network_subnet_port(self, network_id=None):
logging.error("Delete Network:")
status = False
try:
# networks = self.neutron.list_networks()
# port list
# port_list_del = []
network_port = {}
network_port["network_id"] = network_id
list_ports = self.neutron.list_ports(**network_port)
logging.error('------- list_ports : {}'.format(list_ports))
for port in list_ports.get('ports', []):
self.delete_port(port.get('id', ''))
filters = {}
filters["id"] = network_id
networks = self.neutron.list_networks(**filters)
logging.error("-----networks: %s" % networks)
logging.error('*******************************')
# subnets = self.neutron.list_subnets()
# logging.error("subnets: %s" % subnets)
for network_item in networks['networks']:
if network_id == network_item.get('id', ''):
subnet_ids = network_item.get('subnets', [])
logging.error('*******************************')
for example_subnet in subnet_ids:
self.neutron.delete_subnet(example_subnet)
self.neutron.delete_network(network_item['id'])
networks = self.neutron.list_networks(**filters)
logging.error("+++++networks: %s" % networks)
logging.error('*******************************')
status = True
except Exception:
traceback.print_exc()
return status
# 判断路由是否为初始化的虚拟机分配的接口
def check_port_vm(self, id):
status = False
try:
posts = self.get_list_ports(id)
kvm_id = self.get_kvm_name()
post_device_id = posts.get('ports', [])[0].get('device_id', '') if posts.get('ports', []) else ''
if kvm_id == post_device_id:
status = True
except Exception:
traceback.print_exc()
return status
def main():
# Initialize and turn on debug logging
openstack.enable_logging(debug=False)
# Cloud configs are read with openstack.config
try:
with OpenStackMange() as c:
import socket
a = socket.gethostname()
logging.error(a)
# import sys
# logging.error(sys.argv[0])
# import os
# logging.error(os.getcwd()) # 获取当前工作目录路径
#
# logging.error('网段列表')
# c.network_list()
# logging.error('镜像列表')
# c.list_image()
# logging.error('测试镜像查询')
# c.get_id_by_name()
# logging.error('套餐列表')
# c.find_flavor()
# logging.error('端口列表')
# c.get_list_ports()
# logging.error('创建、网段列表')
# c.network_api()
# logging.error('虚拟机列表')
# c.get_kvm_list()
# logging.error('创建端口')
# c.create_port(name="delete--1", network_id='039896b3-9756-4918-b973-492bf8d123c4',
# subnet_id='38c35cea-bbe9-4fc3-aeca-1960703558bf', ip_address='117.0.0.0')
# logging.error('创建随机端口')
# c.range_create_port(name="hanyuyang--0888", network_id='039896b3-9756-4918-b973-492bf8d123c4')
# logging.error('创建镜像')
# c.upload_image_from_file()
# logging.error('创建套餐')
# c.create_flavor()
# logging.error('删除镜像')
# c.delete_image()
# logging.error('删除端口')
# c.delete_port(id='15f5af16-f8f4-4deb-a7b7-29ad5e729611')
# logging.error('删除网段')
# c.delete_network(network_id='cdf5863e-d403-4195-a2e2-f75721180795')
# logging.error('删除套餐')
# c.delete_flavor('c84a8e6b-42bd-4a63-8140-02eeb36ff0da')
logging.error('为默认虚拟机分配ip')
c.set_managervm(network_id='039896b3-9756-4918-b973-492bf8d123c4')
# logging.error('匹配虚拟机名称')
# c.get_kvm_name()
except Exception:
raise
if __name__ == "__main__":
main()