数据治理:ETL 任务监控与告警

钉钉机器人 & 邮件自动告警

监控系统

  • 监控系统,一般是对大数据整个架构、各个数据的输入输出流、中间件的稳定性、数据的准确性、资源的使用情况、任务的执行情况进行监控;
  • 一般的监控告警通过采集告警日志、错误数据、关键词匹配等,获取错误的数据进行实时展现并告警;
  • 常见的监控系统以 Grafana 为基础,主要功能是将收集存储的数据按照不同维度、不同应用、不同用户进行配置化的展示;
  • 为了保证数据安全,每个团队只能看到自己的应用数据。
  • 同时,对不同维度的数据,可以进行报警配置,根据最常用的报警方式,提供了钉钉报警、邮件报警两方式。

监控对象

  • binlog
  • Kafka
  • 同步任务
  • ETL 任务

监控方式

  • 监控库表
    • 数据总量
    • 数据增量
  • 监控字段
    • 字段空值缺失情况
      • NULL 占比超出阈值
    • 字段异常情况
      • INT 型字段值超出范围
      • 枚举 型字段值超出取值范围

监控示例

  • 以监控表数据总量为例

  • 表设计
    • 创建配置表
      • 将配置文件生成一张表,每个人可以通过数据库 insert 的操作去添加自己需要监控的表。

        CREATE TABLE `warehouse.dj_rpt_check_conf` (
        `db` varchar(8) NOT NULL COMMENT '数据库别名,例如bi,online,warehouse结果库)' ,
        `tbl` varchar(64) NOT NULL COMMENT '表名',
        `condition` varchar(256) NOT NULL COMMENT '筛选条件',
        `threshold` bigint(20) NOT NULL DEFAULT 0 COMMENT '阈值',
        `owner` varchar(16) NOT NULL default 'nobody' COMMENT '负责人:每个人自己固定用一个名字',
        `ptype` varchar(8) NOT NULL COMMENT '检查周期,例如:d(天),w(周,周一),m(月,1号)',
        unique index tbl_db (tbl,db)
        ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;
          
        插入数据:
        insert into dj_rpt_check_conf values ('bi','dj_share_category_di','stat_date="${ds}"',20,'dy','d');
        
  • 监控脚本
    # coding: utf-8
      
    from djtool import *
    import pandas as pd 
    import sqlalchemy as sq
    from sqlalchemy import exc
    import sys
    import requests
    import json
    import copy
      
    owner_mobile={'gw':'123456789'}
      
    def check_table(conn,table,condition,threshold):
        try:
            cursor = conn.cursor()
            check_sql= 'select count(1) from ' + table + ' where ' + condition
            cursor.execute(check_sql)
            data = cursor.fetchone()
            if(data[0]<threshold):
                return data[0]
            else:
                return None
        except pymysql.InternalError as error:
            code, message = error.args
            print(">>>>>>>>>>>>>", code, message)
            return -1
      
    ds= sys.argv[1]
    check_conf = pd.read_sql_table('dj_rpt_check_conf',get_sqlalchemy_conn('mysql','bg'))
      
    check_conf['condition'] = check_conf.condition.str.replace('\$\{ds\}',ds)
    check_conf['real_cnt'] = 0
    check_conf['failed'] = 0
      
    for db in check_conf.db.unique():
        db_conn=get_pymysql_conn("mysql_"+db)
        for index,row in check_conf[(check_conf.db==db) & (check_conf.ptype=='d')].iterrows():
            real_cnt = check_table(db_conn,row['tbl'],row['condition'],row['threshold'])
            if(real_cnt is not None):
                 check_conf.loc[index,'real_cnt']=real_cnt
                 check_conf.loc[index,'failed']=1
      
    mail_text = '''
    配置表:`warehouse.dj_rpt_check_conf` 
    '''
    fail_conf = check_conf[(check_conf.failed==1)]
      
    if(fail_conf.shape[0]>0):
        send_mail(['data@idongjia.cn'],[],ds+'--BI任务失败列表',mail_text + fail_conf.to_html())
    else:
        send_mail(['data@idongjia.cn'],[],ds+'--已经加入监控的BI任务完成:)',mail_text)
      
    headers = {'Content-Type': 'application/json'}
    ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxx'
    msg={
         "msgtype": "markdown",
         "markdown": {"title":"BI任务失败了:"+ds,
    "text":"#### BI任务失败了:"+ds+"  \n @mobile 失败任务:\n- fail_task "
         },
        "at": {
            "atMobiles": [
                "88888"
            ]
        }
     }
      
    for owner in fail_conf.owner.unique():
        tmp_msg = copy.deepcopy(msg)
        tmp_msg['at']['atMobiles']=[owner_mobile[owner]]
        tmp_msg['markdown']['text']=tmp_msg['markdown']['text'].replace('mobile',owner_mobile[owner])
        tmp_msg['markdown']['text']=tmp_msg['markdown']['text'].replace('fail_task',"\n- ".join(fail_conf[(fail_conf.owner==owner)].tbl.values.tolist()))
        requests.post(ding_url, headers=headers,data=json.dumps(tmp_msg))
    
  • 告警
    • 钉钉告警
      • 钉钉开发文档:https://link.zhihu.com/?target=https%3A//ding-doc.dingtalk.com/doc%23/serverapi2/qf2nxq

    • 发邮件

      #!/usr/bin/python
      # -*- coding: UTF-8 -*-
         
      import smtplib
      import sys
      from email.mime.text import MIMEText
      from email.mime.multipart import MIMEMultipart
      from email.header import Header
      from email.utils import formataddr
      from sys import argv
        
      def main():
          sender = 'test'
          receivers = ['gaowei@test.cn']
          password = 'xxxxxxxx'
          message = MIMEMultipart()
          message['From'] = formataddr(["数据组",sender])
          message['To'] = formataddr(["数据组成员",receivers])
          subject = 'rest'
          message['Subject'] = Header(subject, 'utf-8')
          message.attach(MIMEText(sys.argv[1]+'数据见附件\n', 'plain', 'utf-8'))
          att1 = MIMEText(open("aa.csv", 'rb').read(), 'base64', 'utf-8')
          att1["Content-Type"] = 'application/octet-stream'
          att1["Content-Disposition"] = 'attachment; '+'filename='+sys.argv[1]+'.csv'
          message.attach(att1)
          try:
           server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
           server.login(sender, password)
           server.sendmail(sender,receivers,message.as_string())
           print "邮件发送成功"
          except smtplib.SMTPException:
           print "Error: 无法发送邮件"
        
      if __name__ == '__main__':
          main()
      
    • shell 脚本

      #!/usr/bin/env bash
      export JAVA_HOME=/opt/jdk1.8.0_121
      export JRE_HOME=${JAVA_HOME}/jre
      export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
      export PATH=$PATH:${JAVA_HOME}/bin:{JRE_HOME}/bin:$PATH
      export PATH=$PATH:/opt/mysql/bin
      dateStrDay=$1
      if [ -z "$1" ] ; then
              dateStrDay=`date +%Y-%m-%d`
      fi
      echo $dateStrDay
      dateStr=`date +%Y-%m-%d-%M-%S`
        
      hadoop fs -rm -r  /tmp/gaowei/test
        
      cd /opt/task/gaowei/warehouse/test
      rm *.csv
        
      spark2-submit --class cn.idongjia.data.auction.AuctionOrder --master yarn --deploy-mode cluster /opt/task/gaowei/warehouse/test/datawarehouse_2.11-1.0.jar
        
      hadoop fs -getmerge /tmp/gaowei/test/* /opt/task/gaowei/warehouse/test/bb.csv
        
      sed '1i\用户id,拍卖订单数,跑单数,异常订单数,是否禁言禁拍(0表示否,1表示是),是否在白名单(0表示否,1表示是),是否屏蔽(0表示否,1表示是,时间)' /opt/task/gaowei/warehouse/test/bb.csv > /opt/task/gaowei/warehouse/test/aa.csv
        
      python Email.py ${dateStrDay}