import os
import time
import json
import hashlib
import hmac
import requests
import execjs
import uuid
import urllib3
import sys
# 控制台地址
CONSOLE_ADDRESS = 'https://ip:4433'
# Open API ID
API_ID = 'xxx'
# Open API 密钥
API_SECRET = 'xxxxxx'
def cal_signature(key: str, url: str, query=None, body=None) -> str:
"""
计算请求体的签名:
签名字符串分为两个部分:1. 路径部分path;2. 参数部分query, body
path路径,例如: /api/v1/admin/login
query参数,对key按ascii排序后拼接的字符串,例如:password=123&username=sf
body参数,通常认为是一个json字符串,为了跨语言兼容,这里必须是严格的json格式,逗号和分号后不能有空格和换行,如:{"key1":"value1"}
路径与参数之间用'?'拼接, 参数与参数之间拼接用'&',例如:
签名字符串为:sign_str = ${path}?${query}&${body},其中${xx}表示变量
"""
# 从url中解析path路径
path = urllib3.util.url.parse_url(url).path
# 按规则拼接query参数的kv_str("k=v")
params_list = []
if query is not None:
sorted_query = sorted(query.items(), key=lambda x: x[0])
for k, v in sorted_query:
params_list.append(k + "=" + v)
# 准备body参数,严格json字符串
if body is not None:
params_list.append(json.dumps(body, ensure_ascii=False, separators=(',', ':')))
if len(params_list):
str_to_sign = path + "?" + "&".join(params_list)
else:
# 如果不存在参数,以路径作为被签名的内容
str_to_sign = path
# 签名
sig = hmac.new(key=key.encode('UTF-8'), digestmod=hashlib.sha256)
sig.update(str_to_sign.encode('UTF-8'))
return sig.hexdigest()
def prepare_auth_headers(url: str, query=None, body=None) -> dict:
"""
准备Open API的请求鉴权头部:
x-ca-key: API ID,从控制台获取
x-ca-sign: 请求签名,用API密钥等拼接出的字符串对请求体内容计算出的签名
x-ca-timestamp: 时间戳,长度为10的整数字符串
x-ca-nonce: 随机数,数字字母加横线的字符串,长度2~128位
"""
app_id = API_ID
app_secret = API_SECRET
# 获取十位时间戳
timestamp = str(int(round(time.time())))
# 数字字母加横线的字符串,长度2~128位,这里我们使用uuid/v4生成这样一个字符串
nonce = str(uuid.uuid4())
# 按照下面规则拼接签名密钥
key = "appId=%s&appSecret=%s×tamp=%s&nonce=%s" % (app_id, app_secret, timestamp, nonce)
sign = cal_signature(key, url, query, body)
return {
"x-ca-key": app_id,
"x-ca-timestamp": timestamp,
"x-ca-nonce": nonce,
"x-ca-sign": sign,
}
def post(path: str, body=None, headers=None) -> requests.Response:
"""
post请求接口,该接口封装了openAPI鉴权的请求头部
"""
if headers is None:
headers = dict()
url = CONSOLE_ADDRESS + path
auth_headers = prepare_auth_headers(url, body=body)
headers.update(auth_headers)
print(auth_headers)
return requests.post(url=url, headers=headers, json=body, verify=False)
def pretty_print_json(data=None, msg=None, quiet=None):
"""
漂亮打印json数据
"""
if msg is None:
print(json.dumps(data))
else:
print("%s\n%s" % (msg, json.dumps(data, indent=2, ensure_ascii=False)))
def create_external_user(name, group_path, **kwargs):
path = "/api/v1/externalUser/create"
body = {'name': name, 'path': group_path }
body['userDirectoryName'] = kwargs.get('user_directory_name')
body['ext'] = kwargs.get('ext')
body['bandIdList'] = kwargs.get('bandIdList')
# body['bandNameList'] = kwargs.get('bandNameList')
print(body)
res = post(path, body)
print(res,'res')
if res.status_code == 200 and res.json()["code"] == 0:
pretty_print_json(res.json(), "新增外部用户(%s)成功:" % name)
return res.json()
else:
raise Exception('新增外部用户失败,原因:%s, 错误码:%s' % (res.json()["msg"], res.json()["code"]))
ext = {
'fieldDataSource': {
'expireTime': 'server',
}
}
bandIdList=["0522f4cb-46b3-11ec-a808-815d5c5c9ca8","0522f4b8-46b3-11ec-a808-815d5c5c9ca8"]
# bandNameList=['VPN_地图中心']
name = "工号"
group_path = "/xxx/公司/部门/xx部/xxx组"
user_directory_name = "LDAP" # 用户目录名
#
#
create_external_user(name, group_path=group_path, user_directory_name=user_directory_name,ext=ext,bandIdList=bandIdList)
# create_external_user(name, group_path=group_path, user_directory_name=user_directory_name,ext=ext,bandNameList=bandNameList)