互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里
故此有了根据丢包率和时延变换联动mtr的需求
前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示
后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢
系统环境
Ubuntu 18.04.5 LTS+Python 3.6.9
python库
自带基本库,考虑到系统权限问题没有使用第三方库
1 #!/usr/bin/env python3 2 #-*-coding:utf-8-*- 3 from collections import deque 4 import itertools,time 5 import queue,json 6 import argparse,sys,re,os,subprocess 7 import time,socket,random,string 8 import threading 9 from functools import reduce 10 import logging 11 12 ipqli=deque() 13 filename = os.path.realpath(sys.argv[0]) 14 def logger(): 15 dir = os.path.dirname(os.path.realpath(sys.argv[0])) 16 log_name = dir+'/log' 17 logger = logging.getLogger() 18 fh = logging.FileHandler(log_name) 19 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") 20 fh.setFormatter(formater) 21 logger.setLevel(logging.DEBUG) 22 logger.addHandler(fh) 23 return logger 24 log = logger() 25 #ping程序,避免系统权限问题未使用ping3 26 class Ping: 27 def __init__(self,ip,count=20,udp_length=64): 28 ip = tuple(ip) 29 self.sip,self.tip,self.type,self.port,self.inver=ip 30 self.type = self.type.lower() 31 self.port = int(self.port) 32 self.count=count 33 self.inver = float(self.inver) 34 self.udp_length=udp_length 35 restime_name = 'restime_deque'+''.join(ip).replace('.','') 36 pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','') 37 ipqevent = 'event'+''.join(ip).replace('.','') 38 locals()[restime_name] = deque(maxlen=60) 39 locals()[pkloss_name] = deque(maxlen=60) 40 self.restime_deque = locals()[restime_name] 41 self.pkloss_deque = locals()[pkloss_name] 42 self.ret_restime_deque = globals()[restime_name] 43 self.ret_pkloss_deque = globals()[pkloss_name] 44 self.ipqevent = globals()[ipqevent] 45 self.compile= r'(?<=time=)\d+\.?\d+(?= ms)' 46 def _tcp(self): 47 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 48 s.settimeout(1) 49 start_time = time.time() 50 res_count=0 51 try: 52 s.bind((self.sip,0)) 53 s.connect((self.tip, self.port)) 54 s.shutdown(socket.SHUT_RD) 55 value = (time.time() - start_time)*1000 56 self.restime_deque.append(value) 57 self.pkloss_deque.append(0) 58 res_count=1 59 except socket.timeout: 60 self.restime_deque.append(0) 61 self.pkloss_deque.append(1) 62 except OSError as e: 63 log.debug(e) 64 return 0,0 65 usetime = time.time()-start_time 66 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 67 return sleep_time,res_count 68 def _udp(self): 69 res_count=0 70 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 71 s.settimeout(1) 72 start_time = time.time() 73 data=''.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length)) 74 try: 75 s.sendto(data.encode('utf-8'),(self.tip,self.port)) 76 s.recv(1024) 77 value = (time.time() - start_time)*1000 78 self.restime_deque.append(value) 79 self.pkloss_deque.append(0) 80 res_count=1 81 except socket.timeout: 82 self.restime_deque.append(0) 83 self.pkloss_deque.append(1) 84 except OSError as e: 85 log.debug(e) 86 return 0,0 87 usetime = time.time()-start_time 88 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 89 return sleep_time,res_count 90 def _icmp(self): 91 res_count=0 92 start_time = time.time() 93 cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip) 94 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') 95 try: 96 value=re.findall(self.compile, ret,re.S)[0] 97 self.restime_deque.append(value) 98 self.pkloss_deque.append(0) 99 res_count=1 100 except: 101 self.pkloss_deque.append(1) 102 self.restime_deque.append(0) 103 usetime = time.time()-start_time 104 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 105 return sleep_time,res_count 106 def fastping(self): 107 getattr(self, '_'+self.type)() 108 def slow_ping(self): 109 index = 0 110 res_count=0 111 self.ipqevent.set() 112 while index<self.count: 113 sleep_time,count=getattr(self, '_'+self.type)() 114 index+=1 115 res_count+=count 116 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 : 117 break 118 time.sleep(sleep_time) 119 return index,res_count 120 def ping_value(self): 121 start_time = time.time() 122 count = self.count 123 rescount = self.count 124 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2: 125 fastli=[] 126 for x in range(self.count): 127 t = threading.Thread(target=self.fastping) 128 t.start() 129 fastli.append(t) 130 for th in fastli: 131 th.join() 132 else: 133 count,rescount = self.slow_ping() 134 rescount=count if rescount==0 else rescount 135 use_time = round(time.time()-start_time,4) 136 li = [self.restime_deque.pop() for x in range(count)] 137 pkli = [self.pkloss_deque.pop() for x in range(count)] 138 try: 139 restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2) 140 pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100 141 return (round(restime,2),round(pkloss,2),use_time) 142 except Exception as e: 143 log.debug(e) 144 return 0,0,0 145 #server端代码 146 class Server(): 147 def __init__(self,sock): 148 global ipqli 149 self.ipqli=ipqli 150 self.thli=[] 151 self.ipli = [] 152 self.sock=sock 153 self.basedir = os.path.dirname(os.path.realpath(sys.argv[0])) 154 self.env = threading.Event() 155 @classmethod 156 def start(cls): 157 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 158 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 159 address = ('127.0.0.1',6590) 160 s.bind(address) 161 obj = cls(s) 162 ping_server=threading.Thread(target=obj.server) 163 ping_server.start() 164 obj.thli.append(ping_server) 165 create_t = threading.Thread(target=obj.create) 166 create_t.start() 167 obj.thli.append(create_t) 168 for t in obj.thli: 169 t.join() 170 def server(self): 171 while True: 172 self.sock.listen(100) 173 conn,addr = self.sock.accept() 174 data=conn.recv(1024) 175 data = data.decode('utf-8') 176 data = json.loads(data) 177 ip,item = data 178 restime_ipq = 'restime_deque'+''.join(ip).replace('.','') 179 pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','') 180 ipqevent = 'event'+''.join(ip).replace('.','') 181 if ip not in self.ipli: 182 globals()[restime_ipq] = deque(maxlen=30) 183 globals()[pkloss_ipq] = deque(maxlen=30) 184 globals()[ipqevent] = threading.Event() 185 self.ipqli.append(ip) 186 self.ipli.append(ip) 187 log.debug('create ipdeque %s %s'%(restime_ipq,pkloss_ipq)) 188 self.env.set() 189 self.sendvalue(conn,ip,item) 190 conn.close() 191 def create(self): 192 while True: 193 self.env.wait() 194 try: 195 ip = self.ipqli.pop() 196 log.debug('create %s'%ip) 197 t=threading.Thread(target=self.makevalue,args=(ip,)) 198 t.start() 199 except Exception as a: 200 log.debug(str(a)) 201 if not self.ipqli: 202 self.env.clear() 203 204 def makevalue(self,ip): 205 restime_name = 'restime_deque'+''.join(ip).replace('.','') 206 pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','') 207 restime_ipq = globals()[restime_name] 208 pkloss_ipq = globals()[pkloss_name] 209 obj = Ping(ip) 210 while len(restime_ipq) < 30 or len(pkloss_ipq) <30: 211 restime,pkloss,use_time=obj.ping_value() 212 restime_ipq.append((restime,use_time)) 213 pkloss_ipq.append((pkloss,use_time)) 214 else: 215 del restime_ipq 216 del pkloss_ipq 217 self.ipli.remove(ip) 218 log.debug('delete ipdeque %s %s'%(restime_name,pkloss_name)) 219 def sendvalue(self,conn,ip,item): 220 fromat_ip=''.join(ip).replace('.','') 221 _,tip,*arg=ip 222 restime_name = 'restime_deque'+fromat_ip 223 pkloss_name = 'pkloss_deque'+fromat_ip 224 ipqevent_name = 'event'+fromat_ip 225 restime_ipq = globals()[restime_name] 226 pkloss_ipq = globals()[pkloss_name] 227 ipqevent = globals()[ipqevent_name] 228 mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log' 229 mtr_cmd = self.basedir + '/mtr.py'+' '+tip+' '+mtr_dir 230 if len(restime_ipq) < 2 and len(restime_ipq) <2: 231 ipqevent.clear() 232 try: 233 ipqevent.wait() 234 if item =='restime': 235 ret,use_time = restime_ipq.pop() 236 hisret,_=restime_ipq[-1] 237 if ret - hisret >20: 238 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 239 elif item =='pkloss': 240 ret,use_time = pkloss_ipq.pop() 241 if 100> ret >20: 242 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 243 except Exception as a: 244 ret = a 245 log.debug(str(ret)) 246 conn.sendall(str(ret).encode()) 247 248 #用户输入IP格式检查 249 class Ipcheck(): 250 def __init__(self,sip,tip,item,ping_type,inver): 251 self.sip =sip 252 self.tip=tip 253 self.item=item 254 self.type = ping_type.lower() 255 self.inver=float(inver) 256 def check(self): 257 if self.item not in ['restime','pkloss'] or self.type not in ['icmp','tcp','udp'] or self.inver<0.2: 258 return False 259 elif not self.checkipformat(): 260 return False 261 else: 262 return True 263 def check_fun(self,ip): 264 return int(ip)<256 265 def checkipformat(self): 266 try: 267 tiplist = self.tip.split('.') 268 tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip) 269 if self.sip: 270 siplist = self.sip.split('.') 271 sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip) 272 else: 273 siplist=[1,1,1,1] 274 sipformat=True 275 if not tipformat or not sipformat: 276 raise 277 check_ipli = tiplist+siplist 278 return self.checkiplength(check_ipli) 279 except: 280 return False 281 def checkiplength(self,check_ipli): 282 if list(itertools.filterfalse(self.check_fun, check_ipli)): 283 return False 284 else: 285 return True 286 def run(): 287 288 cmd = 'python3 %s -S server'%filename 289 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 290 #socket_client端,向server请求数据并返回给用户 291 def socket_client(ip,item): 292 try: 293 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 294 s.settimeout(3) 295 s.connect(('127.0.0.1',6590)) 296 data = [ip,item] 297 data = json.dumps(data) 298 s.sendall(data.encode()) 299 ret = s.recv(1024) 300 s.close() 301 print(ret.decode()) 302 except socket.timeout as t: 303 log.debug(str(t)) 304 s.close() 305 sys.exit(0) 306 except Exception as e: 307 print('server will start') 308 log.debug(str(e)) 309 sys.exit(0) 310 if __name__ == '__main__': 311 parser = argparse.ArgumentParser(description='icmp for monitor') 312 parser.add_argument('-S',action = 'store',dest='server') 313 parser.add_argument('-t',action = 'store',dest='tip') 314 parser.add_argument('-s',action = 'store',dest='sip') 315 parser.add_argument('-I',action='store',dest='item') 316 parser.add_argument('-i',action='store',dest='inver',default='1') 317 parser.add_argument('-T',action='store',dest='ping_type',default='icmp') 318 parser.add_argument('-p',action='store',dest='port',default='0') 319 args= parser.parse_args() 320 server_status_cmd = "ps -ef | grep '%s -S server' | grep -v grep | cut -c 9-16"%filename 321 server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0] 322 if not server_status: 323 run() 324 if args.server: 325 Server.start() 326 sys.exit(0) 327 try: 328 tip = socket.gethostbyname(args.tip) 329 sip = args.sip 330 item = args.item 331 ping_type = args.ping_type 332 port = args.port 333 inver=args.inver 334 ip=(sip,tip,ping_type,port,inver) 335 except: 336 print('format error') 337 check = Ipcheck(sip, tip, item,ping_type,inver) 338 if not check.check(): 339 print('''---------------------------Options----------------------------------- 340 -s --source ip address 341 -t --destination ip address 342 -I --item(restime/pkloss) 343 -T --type(icmp/tcp/udp default icmp) 344 -p --port(default 0) 345 -i --inver(default 1/min 0.2) 346 ---------------------------Example----------------------------------- 347 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80------- 348 ''') 349 sys.exit(0) 350 socket_client(ip,item)
mtr.py
1 #!/usr/bin/env python3 2 #-*-coding:utf-8-*- 3 import sys,logging,os,subprocess,requests 4 import email 5 import smtplib 6 from email.header import Header 7 from email.utils import formataddr 8 from email.mime.text import MIMEText 9 def logger(ip,log_name): 10 logger = logging.getLogger() 11 fh = logging.FileHandler(log_name) 12 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") 13 fh.setFormatter(formater) 14 logger.setLevel(logging.DEBUG) 15 logger.addHandler(fh) 16 return logger 17 def ip_search(ip): 18 r=requests.get('http://ip-api.com/json/%s?lang=zh-CN'%ip) 19 ret = r.json() 20 return (ret['regionName']+' '+ret['city']) 21 class sendemail: 22 def __init__(self,email_list,content,subject): 23 self.email_list = email_list 24 self.content = content 25 self.subject = subject 26 def sendemail(self): 27 msg = MIMEText(self.content,'plain','utf-8') 28 msg['from'] = formataddr(['dark','976584601@qq.com']) 29 msg['to'] = ','.join(self.email_list) 30 msg['subject'] = self.subject 31 service = smtplib.SMTP('smtp.qq.com') 32 service.login('976584601@qq.com','password') 33 service.sendmail('976584601@qq.com',self.email_list,msg.as_string()) 34 service.quit() 35 def mtr(ip,log_name): 36 mtr_log_dir = os.path.dirname(os.path.realpath(sys.argv[0]))+'/mtr_log' 37 cmd ='mtr -r -n -c 1 -w -b %s'%ip 38 data = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') 39 if log_name.split('/')[-1] not in os.listdir(mtr_log_dir): 40 ip_city = ip_search(ip) 41 title = 'Telia到 %s %s 线路异常'%(ip_city,ip) 42 mail_list = ['cs11241991@163.com'] 43 mail = sendemail(mail_list,data,title) 44 mail.sendemail() 45 log = logger(ip,log_name) 46 log.debug(data) 47 if __name__ =='__main__': 48 ip = sys.argv[1] 49 log_name = sys.argv[2] 50 mtr(ip,log_name)
udp探测需要服务器端开启对应端口
1 #!/usr/bin env python3 2 import socket 3 while True: 4 sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 5 sock.bind(('ipaddress',port)) 6 data,addr = sock.recvfrom(65535) 7 sock.sendto(data,addr)
也可以使用socat,实际测试使用socat会引入额外开销,时延不准确
socat -v UDP-LISTEN:4000,fork PIPE
效果
Mysql 版