-
从钉钉后台获取考勤数据等
-
下载钉钉的SDK包
-
从开发者后台获取
AppKey
和
AppSecret
-
上代码:
# -*- coding: utf-8 -*- import requests, logging, json, openpyxl, os, time import dingtalk.api from datetime import datetime as dt from datetime import timedelta import xlsxwriter import configparser import datetime logging.basicConfig(level=logging.DEBUG, format='%(asctime)s line:%(lineno)d %(levelname)s : %(message)s', datefmt=' %Y-%m-%d %H:%M:%S', filename=os.path.join(os.getcwd() + '/logs', 'aq_dingInfos.log'), filemode='a') config = configparser.ConfigParser() config.read(os.path.join(os.getcwd(), "config.ini"), "UTF-8") data_time = str(dt.now()) last_time = str(datetime.date.today() + datetime.timedelta(-1)) + ' 00:00:00' last_time1 = str(datetime.date.today() + datetime.timedelta(-1)) + ' 24:00:00' time1 = dt.now().strftime('%Y-%m-%d') + ' 11:00:00' time2 = dt.now().strftime('%Y-%m-%d') + ' 10:00:00' class Dingding(): def __init__(self): self.appkey = config['AppKey'] self.appsecret = config['AppSecret'] self.access_token = self.getToken() self.userids = self.get_userid() def getToken(self): url = 'https://oapi.dingtalk.com/gettoken?appkey=' + self.appkey + '&appsecret=' + self.appsecret response = requests.get(url=url) result = response.json() try: access_token = result['access_token'] logging.info('获取密钥成功') except Exception as e: errmsg = result['errmsg'] logging.info(e, '\n', errmsg) access_token = '' return access_token def get_userid(self): userids = [] request = dingtalk.api.OapiSmartworkHrmEmployeeQueryonjobRequest("https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/queryonjob") request.status_list = "2,3,5,-1" request.offset = 0 request.size = 50 result = request.getResponse(self.access_token) userids.extend(result["result"]["data_list"]) while "next_cursor" in result["result"]: request.offset = request.offset + request.size result = request.getResponse(self.access_token) if result['errcode'] == 9006 or result['errcode'] == 9005: time.sleep(60) result = request.getResponse(self.access_token) userids.extend(result["result"]["data_list"]) elif result['errcode'] == 90018: time.sleep(1) result = request.getResponse(self.access_token) userids.extend(result["result"]["data_list"]) else: pass userids.extend(result['result']['data_list']) logging.info(userids) logging.info('获取部门userids成功') return userids def get_jobnum(self): uid_jnum = {} # {"员工ID":"员工工号"} userids = self.userids request = dingtalk.api.OapiUserGetDeptMemberRequest("https://oapi.dingtalk.com/user/get") for uid in userids: try: request.userid = uid f = request.getResponse(self.access_token) uid_jnum[f['userid']] = f['jobnumber'] except Exception as e: logging.info(e) time.sleep(1) logging.info('员工ID与工号对照字典已生成') return uid_jnum def get_attendence_listrecord(self, userids): day = 0 request = dingtalk.api.OapiAttendanceListRecordRequest('https://oapi.dingtalk.com/attendance/list') #request.workDateFrom = str(dt.now() - day * timedelta(days=1)) if data_time < time2: request.workDateFrom = last_time request.workDateTo = last_time1 else: request.workDateFrom = data_time request.workDateTo = data_time #request.workDateTo = str(dt.now()) logging.info(request.workDateFrom, request.workDateTo) request.userIdList = userids request.offset = 0 request.limit = 50 f = request.getResponse(self.access_token) h = [] h.append(f) while ('hasMore', True) in f.items(): request.offset = request.offset + request.limit # request.limit += 49 f = request.getResponse(self.access_token) h.append(f) return h def get_value(self): TY = [] uid_jnum = self.get_jobnum() userid_in = self.userids users = [] if len(userid_in) > 50: start = 0 end = 49 while end < len(userid_in): users.append(userid_in[start:end]) start += 49 end = start + 50 users.append(userid_in[start:end]) #break for userid_in_in in users: reponse_all = self.get_attendence_listrecord(userid_in_in) if reponse_all[0]['errmsg'] != 'ok': reponse_all = self.get_attendence_listrecord(userid_in_in) for respose in reponse_all: try: for respose in respose['recordresult']: if respose['sourceType'] != 'SYSTEM': if uid_jnum[respose['userId']]: str_time = str(dt.fromtimestamp(int(respose['userCheckTime']) / 1000)).split(' ') value = (uid_jnum[respose['userId']], str_time[0], str_time[1]) TY.append(value) except Exception as e: logging.info(e) logging.info('获取部门所有考勤结果成功') return list(set(TY)) kq_ding = Dingding() daka_result = kq_ding.get_value() shr_excel = os.path.join(os.getcwd(), 'dingdingData_aq.xlsx') test_book = xlsxwriter.Workbook(shr_excel) worksheet = test_book.add_worksheet(u'打卡记录导入') logging.info('数据开始写入表格中') row = 4 col = 0 worksheet.write(0, 0, u'实体名称') worksheet.write(0, 1, u'原始打卡记录实体') worksheet.write(1, 0, u'实体表') worksheet.write(1, 1, 'T_HR_ATS_PunchCardRecord') worksheet.write(2, 0, u'描述') worksheet.write(2, 1, u'原始打卡记录实体') worksheet.write(3, 0, u'考勤编号') worksheet.write(3, 1, u'打卡日期') worksheet.write(3, 2, u'打卡时间') worksheet.write(3, 3, u'打卡位置') for item in daka_result: if item[2] < '10:05:00': worksheet.write(row, col, item[0]) worksheet.write(row, col+1, item[1]) worksheet.write(row, col+2, item[2]) row += 1 else: if item[2] > '10:05:00': worksheet.write(row, col, item[0]) worksheet.write(row, col+1, item[1]) worksheet.write(row, col+2, item[2]) row += 1 test_book.close() logging.info('打卡记录已经写入完成')
-
版权声明:本文为weixin_46046193原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。