钉钉有个开发平台,通过API可以开发自己企业内部应用,钉钉开发文档顺序写的有点乱,花了挺长时间才看懂,我写了一个python脚本来获取考勤记录,当然能做的不止这些,可以删除公司成员,修改部门,瞎发通知等等。。。(这是上路的新手,大神忽略。)
钉钉API地址: https://open-doc.dingtalk.com/docs/doc.htm?spm=0.0.0.0.L9JL3j&treeId=367&articleId=107520&docType=1
看企业自建应用开发流程和企业简易开发教程,需要获取钉钉管理员权限,可问公司组织管理员。
官网了解下:https://www.python.org/downloads/
了解完下载下
下载完安装下(路径注意不要中文)。
CDM命令pip install PyMySQL。如图:
:\Users\Administrator> pip install PyMySQL
Collecting PyMySQL
Downloading PyMySQL-0.7.11-py2.py3-none-any.whl (78kB)
51% |████████████████▋ |
40kB 109kB/s eta 0:0 64% |████████████████████▊
| 51kB 112kB/s eta 77% |█████████████████████████ | 61kB 135kB/s 90% |█████████████████████████████
| 71kB 152 100% |████████████████████████████████| 81kB 163kB/s
Installing collected packages: PyMySQL
Successfully installed PyMySQL-0.7.11
C:\Users\Administrator>
1).先调用API获取到token
def getaccseetoken():
request = dingtalk.api.OapiGettokenRequest("https://oapi.dingtalk.com/gettoken")
request.corpid = "你的cropid"
request.corpsecret = "你的corpsecret "
f = request.getResponse()
access_token = f['access_token']
return access_token
2).有了token后通过API获取用户UserId
def get_userid(access_token):
request = dingtalk.api.OapiAttendanceListscheduleRequest('https://oapi.dingtalk.com/topapi/attendance/listschedule')
request.workDate = str(dt.strftime(dt.now(), '%Y-%m-%d %H:%M:%S'))
f = request.getResponse(access_token)
userid = list(map(lambda x: x["userid"], f['result']['schedules']))
userid = list(set(userid))
return userid
3).根据API获取到打卡详情(钉钉有人数限制,此处已做处理)
def get_attendence_listrecord(userid, access_token, day):
request = dingtalk.api.OapiAttendanceListRecordRequest('https://oapi.dingtalk.com/attendance/list')
request.workDateFrom = str(dt.now() - day * timedelta(days=1))
request.workDateTo = str(dt.now())
request.userIdList = userid
request.offset = 0
request.limit = 50
f = request.getResponse(access_token)
h = []
h.append(f)
while ('hasMore', True) in f.items():
request.offset = request.offset+request.limit
f = request.getResponse(access_token)
h.append(f)
return h
4).之后你就可以拿数据存入数据库了(注意处理返回system的数据)
def get_value(userid_in, access):
l = []
if len(userid_in)>50:
users = []
start = 0
end = 49
while end<len(userid_in):
users.append(userid_in[start:end])
start +=49
end = start + 50
users.append(userid_in[start:end])
for userid_in_in in users:
reponse = get_attendence_listrecord(userid_in_in, access, day=0)
if reponse[0]['errmsg'] != 'ok':
global access_token
access_token = getaccseetoken()
reponse = get_attendence_listrecord(userid_in_in, access_token, day=0)
for respose in reponse[0]['recordresult']:
if respose['sourceType'] != 'SYSTEM':
value = (respose['userId'], respose['checkType'], dt.fromtimestamp(int(respose['userCheckTime']) / 1000),
respose['locationResult'])
l.append(value)
return l
db = pymysql.connect('数据库地址', '用户名', '密码', '库名', charset="utf8")
cursor = db.cursor()
query = """insert into 表名 (字段名, 字段名,字段名,字段名
) values (%s,%s,%s,%s)"""
valu = get_value(userid, access_token)
def insert():
value = get_value(userid, access_token)
for record in value:
cursor.execute(query, record)
db.commit()