从钉钉后台获取考勤数据(其他数据获取类似)

  • Post author:
  • Post category:其他


  • 从钉钉后台获取考勤数据等

    • 下载钉钉的SDK包

    • 从开发者后台获取

      AppKey



      AppSecret

    • 上代码:

      # -*- coding: utf-8 -*-
      import requests, logging, json, openpyxl, os, time
      import dingtalk.api
      from datetime import datetime as dt
      from datetime import timedelta
      import xlsxwriter
      import configparser
      import datetime
      
      logging.basicConfig(level=logging.DEBUG,
                          format='%(asctime)s  line:%(lineno)d  %(levelname)s : %(message)s',
                          datefmt=' %Y-%m-%d %H:%M:%S',
                          filename=os.path.join(os.getcwd() + '/logs', 'aq_dingInfos.log'),
                          filemode='a')
      
      config = configparser.ConfigParser()
      config.read(os.path.join(os.getcwd(), "config.ini"), "UTF-8")
      
      data_time = str(dt.now())
      last_time = str(datetime.date.today() + datetime.timedelta(-1)) + ' 00:00:00'
      last_time1 = str(datetime.date.today() + datetime.timedelta(-1)) + ' 24:00:00'
      time1 = dt.now().strftime('%Y-%m-%d') + ' 11:00:00'
      time2 = dt.now().strftime('%Y-%m-%d') + ' 10:00:00'
      
      
      
      class Dingding():
          def __init__(self):
              self.appkey = config['AppKey']
              self.appsecret = config['AppSecret']
              self.access_token = self.getToken()
              self.userids = self.get_userid()
      
          def getToken(self):
              url = 'https://oapi.dingtalk.com/gettoken?appkey=' + self.appkey + '&appsecret=' + self.appsecret
              response = requests.get(url=url)
              result = response.json()
              try:
                  access_token = result['access_token']
                  logging.info('获取密钥成功')
              except Exception as e:
                  errmsg = result['errmsg']
                  logging.info(e, '\n', errmsg)
                  access_token = ''
              return access_token
      
          def get_userid(self):
              userids = []
              request = dingtalk.api.OapiSmartworkHrmEmployeeQueryonjobRequest("https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/queryonjob")
              request.status_list = "2,3,5,-1"
              request.offset = 0
              request.size = 50
              result = request.getResponse(self.access_token)
              userids.extend(result["result"]["data_list"])
              while "next_cursor" in result["result"]:
                  request.offset = request.offset + request.size
                  result = request.getResponse(self.access_token)
                  if result['errcode'] == 9006 or result['errcode'] == 9005:
                      time.sleep(60)
                      result = request.getResponse(self.access_token)
                      userids.extend(result["result"]["data_list"])
                  elif result['errcode'] == 90018:
                      time.sleep(1)
                      result = request.getResponse(self.access_token)
                      userids.extend(result["result"]["data_list"])
                  else:
                      pass
                  userids.extend(result['result']['data_list'])
              logging.info(userids)
              logging.info('获取部门userids成功')
              return userids
      
          def get_jobnum(self):
              uid_jnum = {}  # {"员工ID":"员工工号"}
              userids = self.userids
              request = dingtalk.api.OapiUserGetDeptMemberRequest("https://oapi.dingtalk.com/user/get")
              for uid in userids:
                  try:
                      request.userid = uid
                      f = request.getResponse(self.access_token)
                      uid_jnum[f['userid']] = f['jobnumber']
                  except Exception as e:
                      logging.info(e)
                  time.sleep(1)
              logging.info('员工ID与工号对照字典已生成')
              return uid_jnum
      
          def get_attendence_listrecord(self, userids):
              day = 0
              request = dingtalk.api.OapiAttendanceListRecordRequest('https://oapi.dingtalk.com/attendance/list')
              #request.workDateFrom = str(dt.now() - day * timedelta(days=1))
              if data_time < time2:
                  request.workDateFrom = last_time
                  request.workDateTo = last_time1
              else:
                  request.workDateFrom = data_time
                  request.workDateTo = data_time
              #request.workDateTo = str(dt.now())
              logging.info(request.workDateFrom, request.workDateTo)
              request.userIdList = userids
              request.offset = 0
              request.limit = 50
              f = request.getResponse(self.access_token)
              h = []
              h.append(f)
      
              while ('hasMore', True) in f.items():
                  request.offset = request.offset + request.limit
                  # request.limit += 49
                  f = request.getResponse(self.access_token)
                  h.append(f)
              return h
      
          def get_value(self):
              TY = []
              uid_jnum = self.get_jobnum()
              userid_in = self.userids
              users = []
              if len(userid_in) > 50:
      
                  start = 0
                  end = 49
                  while end < len(userid_in):
                      users.append(userid_in[start:end])
                      start += 49
                      end = start + 50
                      users.append(userid_in[start:end])
                      #break
              for userid_in_in in users:
                  reponse_all = self.get_attendence_listrecord(userid_in_in)
                  if reponse_all[0]['errmsg'] != 'ok':
                      reponse_all = self.get_attendence_listrecord(userid_in_in)
                  for respose in reponse_all:
                      try:
                          for respose in respose['recordresult']:
                              if respose['sourceType'] != 'SYSTEM':
                                  if uid_jnum[respose['userId']]:
                                      str_time = str(dt.fromtimestamp(int(respose['userCheckTime']) / 1000)).split(' ')
                                      value = (uid_jnum[respose['userId']], str_time[0], str_time[1])
                                      TY.append(value)
                      except Exception as e:
                          logging.info(e)
              logging.info('获取部门所有考勤结果成功')
              return list(set(TY))
      
      
      kq_ding = Dingding()
      daka_result = kq_ding.get_value()
      
      shr_excel = os.path.join(os.getcwd(), 'dingdingData_aq.xlsx')
      test_book = xlsxwriter.Workbook(shr_excel)
      worksheet = test_book.add_worksheet(u'打卡记录导入')
      logging.info('数据开始写入表格中')
      row = 4
      col = 0
      worksheet.write(0, 0, u'实体名称')
      worksheet.write(0, 1, u'原始打卡记录实体')
      worksheet.write(1, 0, u'实体表')
      worksheet.write(1, 1, 'T_HR_ATS_PunchCardRecord')
      worksheet.write(2, 0, u'描述')
      worksheet.write(2, 1, u'原始打卡记录实体')
      worksheet.write(3, 0, u'考勤编号')
      worksheet.write(3, 1, u'打卡日期')
      worksheet.write(3, 2, u'打卡时间')
      worksheet.write(3, 3, u'打卡位置')
      for item in daka_result:
          if item[2] < '10:05:00':
              worksheet.write(row, col, item[0])
              worksheet.write(row, col+1, item[1])
              worksheet.write(row, col+2, item[2])
              row += 1
          else:
              if item[2] > '10:05:00':
                  worksheet.write(row, col, item[0])
                  worksheet.write(row, col+1, item[1])
                  worksheet.write(row, col+2, item[2])
                  row += 1
      test_book.close()
      logging.info('打卡记录已经写入完成')
      
      
      
      
      
      



版权声明:本文为weixin_46046193原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。