钉钉API考勤打卡记录获取并存入数据库(python)

司徒元明
2023-12-01

前言:


   钉钉有个开发平台,通过API可以开发自己企业内部应用,钉钉开发文档顺序写的有点乱,花了挺长时间才看懂,我写了一个python脚本来获取考勤记录,当然能做的不止这些,可以删除公司成员,修改部门,瞎发通知等等。。。(这是上路的新手,大神忽略。)
 

一、准备 

钉钉API地址: https://open-doc.dingtalk.com/docs/doc.htm?spm=0.0.0.0.L9JL3j&treeId=367&articleId=107520&docType=1

企业自建应用开发流程和企业简易开发教程,需要获取钉钉管理员权限,可问公司组织管理员。

二、python脚本

1.安装python

官网了解下:https://www.python.org/downloads/

了解完下载下

下载完安装下(路径注意不要中文)。

2.安装pymysql库

CDM命令pip install PyMySQL。如图:

:\Users\Administrator> pip install PyMySQL
Collecting PyMySQL
  Downloading PyMySQL-0.7.11-py2.py3-none-any.whl (78kB)
    51% |████████████████▋               | 
    40kB 109kB/s eta 0:0    64% |████████████████████▊       
    | 51kB 112kB/s eta    77% |█████████████████████████      | 61kB 135kB/s    90% |█████████████████████████████
    | 71kB 152    100% |████████████████████████████████| 81kB 163kB/s

Installing collected packages: PyMySQL
Successfully installed PyMySQL-0.7.11

C:\Users\Administrator>

3.脚本编写

1).先调用API获取到token

def getaccseetoken():
    request = dingtalk.api.OapiGettokenRequest("https://oapi.dingtalk.com/gettoken")
    request.corpid = "你的cropid"
    request.corpsecret = "你的corpsecret "
    f = request.getResponse()
    access_token = f['access_token']
    return access_token

2).有了token后通过API获取用户UserId

def get_userid(access_token):
    request = dingtalk.api.OapiAttendanceListscheduleRequest('https://oapi.dingtalk.com/topapi/attendance/listschedule')
    request.workDate = str(dt.strftime(dt.now(), '%Y-%m-%d %H:%M:%S'))
    f = request.getResponse(access_token)
    userid = list(map(lambda x: x["userid"], f['result']['schedules']))
    userid = list(set(userid))
    return userid

3).根据API获取到打卡详情(钉钉有人数限制,此处已做处理)

def get_attendence_listrecord(userid, access_token, day):
    request = dingtalk.api.OapiAttendanceListRecordRequest('https://oapi.dingtalk.com/attendance/list')
    request.workDateFrom = str(dt.now() - day * timedelta(days=1))
    request.workDateTo = str(dt.now())
    request.userIdList = userid
    request.offset = 0
    request.limit = 50
    f = request.getResponse(access_token)
    h = []
    h.append(f)
    while ('hasMore', True) in f.items():
        request.offset = request.offset+request.limit
        f = request.getResponse(access_token)
        h.append(f)
    return h

4).之后你就可以拿数据存入数据库了(注意处理返回system的数据)

def get_value(userid_in, access):
    l = []
    if len(userid_in)>50:
        users = []
        start = 0
        end = 49
        while end<len(userid_in):
            users.append(userid_in[start:end])
            start +=49
            end = start + 50
            users.append(userid_in[start:end])
    for userid_in_in in users:
        reponse = get_attendence_listrecord(userid_in_in, access, day=0)
        if reponse[0]['errmsg'] != 'ok':
            global access_token
            access_token = getaccseetoken()
            reponse = get_attendence_listrecord(userid_in_in, access_token, day=0)
        for respose in reponse[0]['recordresult']:
            if respose['sourceType'] != 'SYSTEM':
                value = (respose['userId'], respose['checkType'], dt.fromtimestamp(int(respose['userCheckTime']) / 1000),
                         respose['locationResult'])
                l.append(value)
    return l

db = pymysql.connect('数据库地址', '用户名', '密码', '库名', charset="utf8")
cursor = db.cursor()
query = """insert into 表名 (字段名, 字段名,字段名,字段名
              ) values (%s,%s,%s,%s)"""
valu = get_value(userid, access_token)

def insert():
    value = get_value(userid, access_token)
    for record in value:
        cursor.execute(query, record)
        db.commit()

 

 类似资料: