# -*- coding:utf-8 -*-
# author: wangzhongzheng
# time: 2022/6/17 16:40
import os, sys
# os.chdir(os.path.dirname(os.path.abspath(__file__)))
from configparser import ConfigParser
import urllib.parse
from datetime import datetime
import pandas as pd
from git import Repo
import git
import requests
import shutil
import json
import ast
import re
class GitLab():
def __init__(self):
config = ConfigParser()"config.ini", encoding='UTF-8')
self.username = config['git_user']['user']
self.password = config['git_user']['password']
self.source_git = config['git_info']['source_git']
self.target_git = config['git_info']['target_git']
self.source_branch = config['branch_info']['source_branch']
self.target_branch = config['branch_info']['target_branch']
self.package_list = ast.literal_eval(config['common_package']['package_list'])
def gitInit(self, git_path, branch):
file_path = git_path.split("/")[-1].replace(".git","")
remote = f"https://{self.username}:{urllib.parse.quote_plus(self.password)}@{git_path.replace('https://', '')}"
if os.path.exists(file_path):
repo = Repo(file_path)
else: # 如果不存在则认为需要初始话拉取代码下来
repo = Repo.clone_from(url=remote, to_path=file_path, branch=branch)
return repo
def run(self):
repo_source = self.gitInit(self.source_git, self.source_branch)
repo_target = self.gitInit(self.target_git, self.target_branch)
# 更新代码,保证代码最新
print(self.source_git, " : ", repo_source.git.pull())
print(self.target_git, " : ", repo_target.git.pull())
# 将目标文件中的通用包复制到指定的位置
for i in self.package_list:
all_file = os.walk(i)
for root, dirs, files in all_file:
file_dir = root.replace("qhdata_etl/src/", "universal_processor/") # 目标位置
if not os.path.exists(file_dir):
if files == []:
for file_name in files:
shutil.copy(root + "/" + file_name, file_dir + "/" + file_name)
# 推送回git上
# 检查状态
# 添加需要提交的文件
for i in self.package_list:
push_path = i.replace("qhdata_etl/src/", "")
# 检查状态
# commit
msg = f'定期自动运行更新基础包程序'
# 提交文件
class GitLabApi():
def __init__(self):
config = ConfigParser()"config.ini", encoding='UTF-8')
git_url = re.findall("https://.*?/",config['git_info']['source_git'])[0]
self.api_url = git_url + "api/v4/"
self.git_token = config['git_user']['token']
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36',
self.session = requests.Session()
headers['PRIVATE-TOKEN'] = self.git_token
self.session.headers = headers
git_version = self.session.get(self.api_url + "version", headers=headers) # 模拟登陆,获取cookie
print("gitlab version: ", git_version.text)
def get(self, url, params: dict = None):
res = self.session.get(url, params=params)
if res.status_code == 200:
return res
print("status_code: ",res.status_code, "message: ", res.text)
return res
def post(self, url, data: dict):
headers = {"Content-Type": "application/json"}
data = json.dumps(data)
res =, data=data, headers=headers)
if res.status_code == 200:
return res
if res.status_code == 201:
return res
print("status_code: ",res.status_code, "message: ", res.text)
return res
def delete(self, url):
res = self.session.delete(url)
if res.status_code == 204:
return res
print("status_code: ",res.status_code, "message: ", res.text)
return res
def put(self, url, data: dict):
res = self.session.put(url, data)
if res.status_code == 200:
return res
print("status_code: ", res.status_code, "message: ", res.text)
return res
def project(self, method="list_all_projects", **kwargs):
if method == "list_all_projects":
url = self.api_url + "projects"
simple = kwargs.get("simple", True)
per_page = kwargs.get("per_page", 100)
params = dict(simple=simple, per_page=per_page)
res = self.get(url, params=params)
res_df = pd.DataFrame().from_dict(json.loads(res.text))
return res_df
if method == "list_user_projects":
user_id = kwargs.get("user_id", None)
assert isinstance(user_id, str), "user_id: The ID or username of the user.Not None"
url = self.api_url + f"users/{user_id}/projects"
simple = kwargs.get("simple", True)
per_page = kwargs.get("per_page", 100)
params = dict(simple=simple, per_page=per_page)
res = self.get(url, params=params)
res_df = pd.DataFrame().from_dict(json.loads(res.text))
return res_df
if method == "get_single_project":
id = kwargs.get("id", None)
assert isinstance(id, str) or isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
url = self.api_url + f"projects/{id}"
res = self.get(url)
res_df = pd.Series(json.loads(res.text))
return res_df
if method == "get_project_users":
id = kwargs.get("id", None)
assert isinstance(id, str) or isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
url = self.api_url + f"projects/{id}/users"
res = self.get(url)
res_df = pd.DataFrame().from_dict(json.loads(res.text))
return res_df
def branches(self, method="list_repository_branches", **kwargs):
if method == "list_repository_branches":
id = kwargs.get("id", None)
assert isinstance(id, str) or isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
url = self.api_url + f"projects/{id}/repository/branches"
res = self.get(url)
res_df = pd.DataFrame().from_dict(json.loads(res.text))
return res_df
if method == "create_repository_branch":
id = kwargs.get("id", None)
branch = kwargs.get("branch", None)
ref = kwargs.get("ref", None)
assert isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
assert isinstance(branch, str), "branch: Name of the branch.Not None"
assert isinstance(ref, str), "ref: Branch name or commit SHA to create branch from.Not None"
url = self.api_url + f"projects/{id}/repository/branches"
data = dict(branch=branch, ref=ref)
res =, data=data)
return res
if method == "delete_repository_branch":
id = kwargs.get("id", None)
branch = kwargs.get("branch", None)
assert isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
assert isinstance(branch, str), "branch: Name of the branch.Not None"
url = self.api_url + f"projects/{id}/repository/branches/{branch}"
res = self.delete(url)
def mergeRequests(self, method="list_merge_requests", **kwargs):
if method == "list_merge_requests":
state = kwargs.get("state", "all")
with_labels_details = kwargs.get("with_labels_details", False)
url = self.api_url + "merge_requests"
assert state in ["all", "opened", "closed", "locked", "merged"], "state: 为返回所有all合并请求或仅返回opened, closed,locked或merged"
assert with_labels_details in [True, False], "with_labels_details: 如果true,响应返回标签字段中每个标签的更多详细信息::name, :color, :description, :description_html, :text_color。默认为false。"
params = dict(state=state, with_labels_details=with_labels_details)
res = self.get(url, params=params)
res_df = pd.DataFrame().from_dict(json.loads(res.text))
return res_df
if method == "list_project_merge_requests":
id = kwargs.get("id", None)
state = kwargs.get("state", "all")
source_branch = kwargs.get("source_branch", None)
target_branch = kwargs.get("target_branch", None)
url = self.api_url + f"projects/{id}/merge_requests"
assert isinstance(id, str) or isinstance(id, int), "id: The ID or URL-encoded path of the project owned by the authenticated user..Not None"
assert state in ["all", "opened", "closed", "locked", "merged"], "state: 为返回所有all合并请求或仅返回opened, closed,locked或merged"
params = dict(id=id, state=state, source_branch=source_branch, target_branch=target_branch)
res = self.get(url, params=params)
res_df = pd.DataFrame().from_dict(json.loads(res.text))
return res_df
if method == "create_MR":
id = kwargs.get("id", None)
source_branch = kwargs.get("source_branch", None)
target_branch = kwargs.get("target_branch", None)
title = kwargs.get("title", None)
assert isinstance(id, str) or isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
assert isinstance(source_branch, str), "source_branch: The source branch.Not None"
assert isinstance(target_branch, str), "target_branch: The target branch.Not None"
assert isinstance(title, str), "title: Title of MR.Not None"
url = self.api_url + f"projects/{id}/merge_requests"
data = dict(source_branch=source_branch, target_branch=target_branch, title=title)
res =, data=data)
return res
if method == "update_MR":
id = kwargs.get("id", None)
merge_request_iid = kwargs.get("merge_request_iid", None)
description = kwargs.get("description", None) # str, Description of MR. Limited to 1,048,576 characters.
state_event = kwargs.get("state_event", None) # str, New state (close/reopen).
title = kwargs.get("title", None) # str, Title of MR..
assert isinstance(id, str) or isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
assert isinstance(merge_request_iid, int), "merge_request_iid: The ID of a merge request.Not None"
url = self.api_url + f"projects/{id}/merge_requests/{merge_request_iid}"
data = dict(id=id, merge_request_iid=merge_request_iid, title=title, description=description, state_event=state_event)
res = self.put(url, data=data)
return res
if method == "delete_a_merge_request": # master权限
id = kwargs.get("id", None)
merge_request_iid = kwargs.get("merge_request_iid", None)
assert isinstance(id, str) or isinstance(id, int), "id: The ID or URL-encoded path of the project.Not None"
assert isinstance(merge_request_iid, int), "merge_request_iid: The ID of a merge request.Not None"
url = self.api_url + f"projects/{id}/merge_requests/{merge_request_iid}"
res = self.delete(url)
return res
# 获取当前账号有权限的全部工程列表
# 注意:参数per_page最大为100,传入超过100的值时,gitlab的url请求中的per_page默认取100
def gitlab_projects(self, per_page=100):
projects_api = self.api_url + 'projects?simple=yes&per_page=%s' % per_page
projects_headers = self.session.head(projects_api).headers
projects_num = int(projects_headers['X-Total']) # 获取工程总数
projects_pages = int(projects_headers['X-Total-Pages']) # 获取工程总页数
print("工程总数:", projects_num)
result = []
for i in range(projects_pages):
page = i + 1
projects_url = projects_api + '&page=' + str(page)
projects = self.session.get(projects_url).text
projects_json = json.loads(projects)
if type(projects_json) == list:
result = result + projects_json
print("projects_url: %s" % projects_url)
print("headers: %s" % self.session.head(projects_api).headers)
print("response: %s" % projects_json)
return result
# 获取工程的分支
# 注意:参数per_page最大为100,传入超过100的值时,gitlab的url请求中的per_page默认取100
def gitlab_project_branches(self, project_id, project_name, page=1, per_page=100, get_all_branches=False,
print('工程id是' + str(project_id) + ':', '工程名是' + str(project_name))
next_page = page
branch_count = max_branch_count
branch_names = []
while next_page > 0 and branch_count > 0:
project_branches_api = self.api_url + 'projects/%s/repository/branches?page=%s&per_page=%s' % (
project_id, next_page, per_page)
branch_headers = self.session.head(project_branches_api).headers
next_page = int(branch_headers['X-Next-Page']) if "X-Next-Page" in branch_headers and branch_headers[
'X-Next-Page'] != '' else 0
if not get_all_branches:
branch_count = branch_count - per_page
branches_info = self.session.get(project_branches_api).text
branches_info = json.loads(branches_info)
for v in branches_info:
if get_all_branches:
return branch_names
return branch_names[:max_branch_count]
# 增量获取所有工程所有分支的提交日志
# 注意:参数per_page最大为100,传入超过100的值时,gitlab的url请求中的per_page默认取100
def get_project_commits(self, project_id, project_branch_name, page=1, per_page=100, get_all_commits=False,
commit_all = []
next_page = page
commit_count = max_commit_count
while next_page and commit_count > 0:
if not get_all_commits:
commit_count = commit_count - per_page
commit_api = self.api_url + 'projects/%s/repository/commits?ref_name=%s&page=%s&per_page=%s' % (
project_id, project_branch_name, next_page, per_page)
commit_headers = self.session.head(commit_api).headers
next_page = commit_headers["X-Next-Page"]
commit_info = self.session.get(commit_api).text
commit_all = commit_all + json.loads(commit_info)
print("%s分支获取的commit信息获取完成" % project_branch_name)
if get_all_commits:
return commit_all
return commit_all[:max_commit_count]
if __name__ == "__main__":
gl = GitLab()
gli = GitLabApi()
mr_open = gli.mergeRequests(method="list_project_merge_requests", id=208, source_branch="user_01", target_branch="master", state="opened")
if mr_open.empty:
gli.mergeRequests(method="create_MR", id=208, source_branch="user_01", target_branch="master", title="定期自动运行更新基础包程序")
merge_request_iid = int(mr_open["iid"][0])
gli.mergeRequests(method="update_MR", id=208, merge_request_iid=merge_request_iid, description="测试修改描述同时将状态改为关闭", state_event="close")
mr_open = gli.mergeRequests(method="list_project_merge_requests", id=208, source_branch="user_01", target_branch="master", state="opened")
if mr_open.empty:
gli.mergeRequests(method="create_MR", id=208, source_branch="user_01", target_branch="master", title="定期自动运行更新基础包程序")
merge_request_iid = int(mr_open["iid"][0])
gli.mergeRequests(method="update_MR", id=208, merge_request_iid=merge_request_iid, description="测试修改描述")
gli.mergeRequests(method="delete_a_merge_request", id=208, merge_request_iid=merge_request_iid)