VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > temp > 简明python教程 >
  • zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自

互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里

故此有了根据丢包率和时延变换联动mtr的需求

前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示

后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢

系统环境

  Ubuntu 18.04.5 LTS+Python 3.6.9 

python库

  自带基本库,考虑到系统权限问题没有使用第三方库

 

复制代码
  1 #!/usr/bin/env python3
  2 #-*-coding:utf-8-*-
  3 from collections import deque
  4 import itertools,time
  5 import queue,json
  6 import argparse,sys,re,os,subprocess
  7 import time,socket,random,string
  8 import threading
  9 from functools import reduce
 10 import logging
 11 
 12 ipqli=deque()
 13 filename = os.path.realpath(sys.argv[0])
 14 def logger():
 15     dir = os.path.dirname(os.path.realpath(sys.argv[0]))
 16     log_name = dir+'/log'
 17     logger = logging.getLogger()
 18     fh = logging.FileHandler(log_name)
 19     formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
 20     fh.setFormatter(formater)
 21     logger.setLevel(logging.DEBUG)
 22     logger.addHandler(fh)
 23     return logger
 24 log = logger()
 25 #ping程序,避免系统权限问题未使用ping3
 26 class Ping:
 27     def __init__(self,ip,count=20,udp_length=64):
 28         ip = tuple(ip)
 29         self.sip,self.tip,self.type,self.port,self.inver=ip
 30         self.type = self.type.lower()
 31         self.port = int(self.port)
 32         self.count=count
 33         self.inver = float(self.inver)
 34         self.udp_length=udp_length
 35         restime_name = 'restime_deque'+''.join(ip).replace('.','')
 36         pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
 37         ipqevent = 'event'+''.join(ip).replace('.','')
 38         locals()[restime_name] = deque(maxlen=60)
 39         locals()[pkloss_name] = deque(maxlen=60)
 40         self.restime_deque = locals()[restime_name]
 41         self.pkloss_deque = locals()[pkloss_name]
 42         self.ret_restime_deque = globals()[restime_name]
 43         self.ret_pkloss_deque = globals()[pkloss_name]
 44         self.ipqevent = globals()[ipqevent]
 45         self.compile= r'(?<=time=)\d+\.?\d+(?= ms)'
 46     def _tcp(self):
 47             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 48             s.settimeout(1)
 49             start_time = time.time()
 50             res_count=0
 51             try:
 52                 s.bind((self.sip,0))
 53                 s.connect((self.tip, self.port))
 54                 s.shutdown(socket.SHUT_RD)
 55                 value = (time.time() - start_time)*1000  
 56                 self.restime_deque.append(value)
 57                 self.pkloss_deque.append(0)
 58                 res_count=1
 59             except socket.timeout:
 60                 self.restime_deque.append(0)
 61                 self.pkloss_deque.append(1)
 62             except OSError as e:
 63                 log.debug(e)
 64                 return 0,0
 65             usetime = time.time()-start_time
 66             sleep_time = self.inver - usetime if usetime<self.inver else self.inver
 67             return sleep_time,res_count
 68     def _udp(self):
 69         res_count=0
 70         s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
 71         s.settimeout(1)
 72         start_time = time.time()
 73         data=''.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
 74         try:
 75             s.sendto(data.encode('utf-8'),(self.tip,self.port))
 76             s.recv(1024)
 77             value = (time.time() - start_time)*1000
 78             self.restime_deque.append(value)
 79             self.pkloss_deque.append(0)
 80             res_count=1
 81         except socket.timeout:
 82             self.restime_deque.append(0)
 83             self.pkloss_deque.append(1)
 84         except OSError as e:
 85             log.debug(e)
 86             return 0,0
 87         usetime = time.time()-start_time
 88         sleep_time = self.inver - usetime if usetime<self.inver else self.inver
 89         return sleep_time,res_count
 90     def _icmp(self):
 91         res_count=0
 92         start_time = time.time()
 93         cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip)
 94         ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
 95         try:
 96             value=re.findall(self.compile, ret,re.S)[0]
 97             self.restime_deque.append(value)
 98             self.pkloss_deque.append(0)
 99             res_count=1
100         except:
101             self.pkloss_deque.append(1)
102             self.restime_deque.append(0)
103         usetime = time.time()-start_time
104         sleep_time = self.inver - usetime if usetime<self.inver else self.inver
105         return sleep_time,res_count
106     def fastping(self):
107         getattr(self, '_'+self.type)()
108     def slow_ping(self):
109         index = 0
110         res_count=0
111         self.ipqevent.set()
112         while index<self.count:
113             sleep_time,count=getattr(self, '_'+self.type)()
114             index+=1
115             res_count+=count
116             if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 :
117                 break
118             time.sleep(sleep_time)
119         return index,res_count
120     def ping_value(self):
121         start_time = time.time()
122         count = self.count
123         rescount = self.count
124         if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
125             fastli=[]
126             for x in range(self.count):
127                 t = threading.Thread(target=self.fastping)
128                 t.start()
129                 fastli.append(t)
130             for th in fastli:
131                 th.join()
132         else:
133             count,rescount = self.slow_ping()
134             rescount=count if rescount==0 else rescount
135         use_time = round(time.time()-start_time,4)
136         li = [self.restime_deque.pop() for x in range(count)]
137         pkli = [self.pkloss_deque.pop() for x in range(count)]
138         try:
139             restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2)
140             pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
141             return (round(restime,2),round(pkloss,2),use_time)   
142         except Exception as e:
143             log.debug(e)
144             return 0,0,0
145 #server端代码
146 class Server():
147     def __init__(self,sock):
148         global ipqli
149         self.ipqli=ipqli
150         self.thli=[]
151         self.ipli = []
152         self.sock=sock
153         self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
154         self.env = threading.Event()
155     @classmethod
156     def start(cls):
157         s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
158         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
159         address = ('127.0.0.1',6590)
160         s.bind(address)
161         obj = cls(s)
162         ping_server=threading.Thread(target=obj.server)
163         ping_server.start()
164         obj.thli.append(ping_server)
165         create_t = threading.Thread(target=obj.create)
166         create_t.start()
167         obj.thli.append(create_t)
168         for t in obj.thli:
169             t.join()
170     def server(self):
171         while True:
172             self.sock.listen(100)
173             conn,addr = self.sock.accept() 
174             data=conn.recv(1024) 
175             data = data.decode('utf-8')
176             data = json.loads(data)
177             ip,item = data
178             restime_ipq = 'restime_deque'+''.join(ip).replace('.','')
179             pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','')
180             ipqevent = 'event'+''.join(ip).replace('.','')
181             if ip not in self.ipli:
182                 globals()[restime_ipq] = deque(maxlen=30)
183                 globals()[pkloss_ipq] = deque(maxlen=30)
184                 globals()[ipqevent] = threading.Event()
185                 self.ipqli.append(ip)
186                 self.ipli.append(ip)
187                 log.debug('create ipdeque %s %s'%(restime_ipq,pkloss_ipq))
188                 self.env.set()
189             self.sendvalue(conn,ip,item)
190             conn.close()
191     def create(self):
192         while True:
193             self.env.wait()
194             try:
195                 ip = self.ipqli.pop()
196                 log.debug('create %s'%ip)
197                 t=threading.Thread(target=self.makevalue,args=(ip,))
198                 t.start()
199             except Exception as a:
200                 log.debug(str(a))
201             if not self.ipqli:
202                 self.env.clear()
203             
204     def makevalue(self,ip):
205         restime_name = 'restime_deque'+''.join(ip).replace('.','')
206         pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
207         restime_ipq = globals()[restime_name]
208         pkloss_ipq = globals()[pkloss_name]
209         obj = Ping(ip)
210         while len(restime_ipq) < 30 or len(pkloss_ipq) <30:
211                 restime,pkloss,use_time=obj.ping_value()            
212                 restime_ipq.append((restime,use_time))
213                 pkloss_ipq.append((pkloss,use_time))   
214         else:
215             del restime_ipq
216             del pkloss_ipq
217             self.ipli.remove(ip)
218             log.debug('delete ipdeque %s %s'%(restime_name,pkloss_name))
219     def sendvalue(self,conn,ip,item):
220         fromat_ip=''.join(ip).replace('.','')
221         _,tip,*arg=ip
222         restime_name = 'restime_deque'+fromat_ip
223         pkloss_name = 'pkloss_deque'+fromat_ip
224         ipqevent_name = 'event'+fromat_ip
225         restime_ipq = globals()[restime_name]
226         pkloss_ipq = globals()[pkloss_name]
227         ipqevent = globals()[ipqevent_name]
228         mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
229         mtr_cmd = self.basedir + '/mtr.py'+' '+tip+' '+mtr_dir
230         if len(restime_ipq) < 2 and len(restime_ipq) <2:
231             ipqevent.clear()
232         try:
233             ipqevent.wait()
234             if item =='restime':
235                 ret,use_time = restime_ipq.pop()
236                 hisret,_=restime_ipq[-1]
237                 if ret - hisret >20:
238                     subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
239             elif item =='pkloss':
240                 ret,use_time = pkloss_ipq.pop()
241                 if 100> ret  >20:
242                     subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
243         except Exception as a:
244             ret = a
245             log.debug(str(ret))
246         conn.sendall(str(ret).encode())
247 
248 #用户输入IP格式检查
249 class Ipcheck():
250     def __init__(self,sip,tip,item,ping_type,inver):
251         self.sip =sip
252         self.tip=tip
253         self.item=item
254         self.type = ping_type.lower()
255         self.inver=float(inver)
256     def check(self):
257         if self.item not in ['restime','pkloss'] or self.type not in ['icmp','tcp','udp'] or self.inver<0.2:
258             return False
259         elif not self.checkipformat():
260             return False
261         else:
262             return True
263     def check_fun(self,ip):
264         return int(ip)<256
265     def checkipformat(self):
266         try:
267             tiplist = self.tip.split('.')
268             tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip)
269             if  self.sip:
270                 siplist = self.sip.split('.')
271                 sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip)
272             else:
273                 siplist=[1,1,1,1]
274                 sipformat=True
275             if not tipformat or not sipformat:
276                 raise
277             check_ipli = tiplist+siplist
278             return self.checkiplength(check_ipli)
279         except:
280             return False
281     def checkiplength(self,check_ipli):
282         if list(itertools.filterfalse(self.check_fun, check_ipli)):
283             return False
284         else:
285             return True        
286 def run():
287     
288     cmd = 'python3 %s -S server'%filename
289     subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
290 #socket_client端,向server请求数据并返回给用户
291 def socket_client(ip,item):
292     try:
293         s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
294         s.settimeout(3)
295         s.connect(('127.0.0.1',6590))
296         data = [ip,item]
297         data = json.dumps(data)
298         s.sendall(data.encode())
299         ret = s.recv(1024)
300         s.close()
301         print(ret.decode())
302     except socket.timeout as t:
303         log.debug(str(t))
304         s.close()
305         sys.exit(0)
306     except Exception as e:
307         print('server will start')
308         log.debug(str(e))
309         sys.exit(0)
310 if __name__ == '__main__':
311     parser = argparse.ArgumentParser(description='icmp for monitor')
312     parser.add_argument('-S',action = 'store',dest='server')
313     parser.add_argument('-t',action = 'store',dest='tip')
314     parser.add_argument('-s',action = 'store',dest='sip')
315     parser.add_argument('-I',action='store',dest='item')
316     parser.add_argument('-i',action='store',dest='inver',default='1')
317     parser.add_argument('-T',action='store',dest='ping_type',default='icmp')
318     parser.add_argument('-p',action='store',dest='port',default='0')
319     args= parser.parse_args()
320     server_status_cmd = "ps -ef | grep '%s -S server' | grep -v grep | cut -c 9-16"%filename
321     server_status  = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
322     if not server_status:
323         run()
324     if args.server:
325         Server.start()
326         sys.exit(0)
327     try:
328         tip = socket.gethostbyname(args.tip)
329         sip = args.sip
330         item = args.item
331         ping_type = args.ping_type
332         port = args.port
333         inver=args.inver
334         ip=(sip,tip,ping_type,port,inver)
335     except:
336         print('format error')
337     check = Ipcheck(sip, tip, item,ping_type,inver)
338     if not check.check():
339         print('''---------------------------Options-----------------------------------
340 -s --source ip address
341 -t --destination ip address
342 -I --item(restime/pkloss)
343 -T --type(icmp/tcp/udp default icmp)
344 -p --port(default 0)
345 -i --inver(default 1/min 0.2)
346 ---------------------------Example-----------------------------------
347 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80-------
348         ''')
349         sys.exit(0)
350     socket_client(ip,item)
复制代码

mtr.py

复制代码
 1 #!/usr/bin/env python3
 2 #-*-coding:utf-8-*-
 3 import sys,logging,os,subprocess,requests
 4 import email
 5 import smtplib
 6 from email.header import Header
 7 from email.utils import formataddr
 8 from email.mime.text import MIMEText
 9 def logger(ip,log_name):
10     logger = logging.getLogger()
11     fh = logging.FileHandler(log_name)
12     formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
13     fh.setFormatter(formater)
14     logger.setLevel(logging.DEBUG)
15     logger.addHandler(fh)
16     return logger
17 def ip_search(ip):
18     r=requests.get('http://ip-api.com/json/%s?lang=zh-CN'%ip)
19     ret = r.json()
20     return (ret['regionName']+' '+ret['city'])
21 class sendemail:
22     def __init__(self,email_list,content,subject):
23         self.email_list = email_list
24         self.content = content
25         self.subject = subject
26     def sendemail(self):
27         msg = MIMEText(self.content,'plain','utf-8')
28         msg['from'] = formataddr(['dark','976584601@qq.com'])
29         msg['to'] = ','.join(self.email_list)
30         msg['subject'] = self.subject
31         service = smtplib.SMTP('smtp.qq.com')
32         service.login('976584601@qq.com','password')
33         service.sendmail('976584601@qq.com',self.email_list,msg.as_string())
34         service.quit()
35 def mtr(ip,log_name):
36     mtr_log_dir = os.path.dirname(os.path.realpath(sys.argv[0]))+'/mtr_log'
37     cmd ='mtr -r -n -c 1 -w -b %s'%ip
38     data = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
39     if log_name.split('/')[-1] not in os.listdir(mtr_log_dir):
40         ip_city = ip_search(ip)
41         title = 'Telia到 %s %s 线路异常'%(ip_city,ip)
42         mail_list = ['cs11241991@163.com']
43         mail = sendemail(mail_list,data,title)
44         mail.sendemail()
45     log = logger(ip,log_name)
46     log.debug(data)
47 if __name__ =='__main__':
48     ip = sys.argv[1]
49     log_name = sys.argv[2]
50     mtr(ip,log_name)
复制代码

 

 

 

 udp探测需要服务器端开启对应端口

复制代码
1 #!/usr/bin env python3
2 import socket
3 while True:
4     sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
5     sock.bind(('ipaddress',port))
6     data,addr = sock.recvfrom(65535)
7     sock.sendto(data,addr)
复制代码

 也可以使用socat,实际测试使用socat会引入额外开销,时延不准确

socat -v UDP-LISTEN:4000,fork PIPE

 

效果

 

 

 

 

 

Mysql 版

https://www.cnblogs.com/darkchen/p/14744242.html

以驱魔为理想,为生计而奔波]]

原文:https://www.cnblogs.com/darkchen/p/15524856.html

相关教程