监控系统
- 监控系统,一般是对大数据整个架构、各个数据的输入输出流、中间件的稳定性、数据的准确性、资源的使用情况、任务的执行情况进行监控;
- 一般的监控告警通过采集告警日志、错误数据、关键词匹配等,获取错误的数据进行实时展现并告警;
- 常见的监控系统以 Grafana 为基础,主要功能是将收集存储的数据按照不同维度、不同应用、不同用户进行配置化的展示;
- 为了保证数据安全,每个团队只能看到自己的应用数据。
- 同时,对不同维度的数据,可以进行报警配置,根据最常用的报警方式,提供了钉钉报警、邮件报警两方式。
监控对象
- binlog
- Kafka
- 同步任务
- ETL 任务
监控方式
- 监控库表
- 数据总量
- 数据增量
- 监控字段
- 字段空值缺失情况
- NULL 占比超出阈值
- 字段异常情况
- INT 型字段值超出范围
- 枚举 型字段值超出取值范围
- 字段空值缺失情况
监控示例
-
以监控表数据总量为例
- 表设计
- 创建配置表
-
将配置文件生成一张表,每个人可以通过数据库 insert 的操作去添加自己需要监控的表。
CREATE TABLE `warehouse.dj_rpt_check_conf` ( `db` varchar(8) NOT NULL COMMENT '数据库别名,例如bi,online,warehouse结果库)' , `tbl` varchar(64) NOT NULL COMMENT '表名', `condition` varchar(256) NOT NULL COMMENT '筛选条件', `threshold` bigint(20) NOT NULL DEFAULT 0 COMMENT '阈值', `owner` varchar(16) NOT NULL default 'nobody' COMMENT '负责人:每个人自己固定用一个名字', `ptype` varchar(8) NOT NULL COMMENT '检查周期,例如:d(天),w(周,周一),m(月,1号)', unique index tbl_db (tbl,db) ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4; 插入数据: insert into dj_rpt_check_conf values ('bi','dj_share_category_di','stat_date="${ds}"',20,'dy','d');
-
- 创建配置表
- 监控脚本
# coding: utf-8 from djtool import * import pandas as pd import sqlalchemy as sq from sqlalchemy import exc import sys import requests import json import copy owner_mobile={'gw':'123456789'} def check_table(conn,table,condition,threshold): try: cursor = conn.cursor() check_sql= 'select count(1) from ' + table + ' where ' + condition cursor.execute(check_sql) data = cursor.fetchone() if(data[0]<threshold): return data[0] else: return None except pymysql.InternalError as error: code, message = error.args print(">>>>>>>>>>>>>", code, message) return -1 ds= sys.argv[1] check_conf = pd.read_sql_table('dj_rpt_check_conf',get_sqlalchemy_conn('mysql','bg')) check_conf['condition'] = check_conf.condition.str.replace('\$\{ds\}',ds) check_conf['real_cnt'] = 0 check_conf['failed'] = 0 for db in check_conf.db.unique(): db_conn=get_pymysql_conn("mysql_"+db) for index,row in check_conf[(check_conf.db==db) & (check_conf.ptype=='d')].iterrows(): real_cnt = check_table(db_conn,row['tbl'],row['condition'],row['threshold']) if(real_cnt is not None): check_conf.loc[index,'real_cnt']=real_cnt check_conf.loc[index,'failed']=1 mail_text = ''' 配置表:`warehouse.dj_rpt_check_conf` ''' fail_conf = check_conf[(check_conf.failed==1)] if(fail_conf.shape[0]>0): send_mail(['data@idongjia.cn'],[],ds+'--BI任务失败列表',mail_text + fail_conf.to_html()) else: send_mail(['data@idongjia.cn'],[],ds+'--已经加入监控的BI任务完成:)',mail_text) headers = {'Content-Type': 'application/json'} ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxx' msg={ "msgtype": "markdown", "markdown": {"title":"BI任务失败了:"+ds, "text":"#### BI任务失败了:"+ds+" \n @mobile 失败任务:\n- fail_task " }, "at": { "atMobiles": [ "88888" ] } } for owner in fail_conf.owner.unique(): tmp_msg = copy.deepcopy(msg) tmp_msg['at']['atMobiles']=[owner_mobile[owner]] tmp_msg['markdown']['text']=tmp_msg['markdown']['text'].replace('mobile',owner_mobile[owner]) tmp_msg['markdown']['text']=tmp_msg['markdown']['text'].replace('fail_task',"\n- ".join(fail_conf[(fail_conf.owner==owner)].tbl.values.tolist())) requests.post(ding_url, headers=headers,data=json.dumps(tmp_msg))
- 告警
- 钉钉告警
-
钉钉开发文档:https://link.zhihu.com/?target=https%3A//ding-doc.dingtalk.com/doc%23/serverapi2/qf2nxq
-
-
发邮件
#!/usr/bin/python # -*- coding: UTF-8 -*- import smtplib import sys from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from email.utils import formataddr from sys import argv def main(): sender = 'test' receivers = ['gaowei@test.cn'] password = 'xxxxxxxx' message = MIMEMultipart() message['From'] = formataddr(["数据组",sender]) message['To'] = formataddr(["数据组成员",receivers]) subject = 'rest' message['Subject'] = Header(subject, 'utf-8') message.attach(MIMEText(sys.argv[1]+'数据见附件\n', 'plain', 'utf-8')) att1 = MIMEText(open("aa.csv", 'rb').read(), 'base64', 'utf-8') att1["Content-Type"] = 'application/octet-stream' att1["Content-Disposition"] = 'attachment; '+'filename='+sys.argv[1]+'.csv' message.attach(att1) try: server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465) server.login(sender, password) server.sendmail(sender,receivers,message.as_string()) print "邮件发送成功" except smtplib.SMTPException: print "Error: 无法发送邮件" if __name__ == '__main__': main()
-
shell 脚本
#!/usr/bin/env bash export JAVA_HOME=/opt/jdk1.8.0_121 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=$PATH:${JAVA_HOME}/bin:{JRE_HOME}/bin:$PATH export PATH=$PATH:/opt/mysql/bin dateStrDay=$1 if [ -z "$1" ] ; then dateStrDay=`date +%Y-%m-%d` fi echo $dateStrDay dateStr=`date +%Y-%m-%d-%M-%S` hadoop fs -rm -r /tmp/gaowei/test cd /opt/task/gaowei/warehouse/test rm *.csv spark2-submit --class cn.idongjia.data.auction.AuctionOrder --master yarn --deploy-mode cluster /opt/task/gaowei/warehouse/test/datawarehouse_2.11-1.0.jar hadoop fs -getmerge /tmp/gaowei/test/* /opt/task/gaowei/warehouse/test/bb.csv sed '1i\用户id,拍卖订单数,跑单数,异常订单数,是否禁言禁拍(0表示否,1表示是),是否在白名单(0表示否,1表示是),是否屏蔽(0表示否,1表示是,时间)' /opt/task/gaowei/warehouse/test/bb.csv > /opt/task/gaowei/warehouse/test/aa.csv python Email.py ${dateStrDay}
- 钉钉告警