#!/usr/bin/env python """ File: monitor_multiple.py Desc: Read all registers from a TCP MODBUS Slave """ __author__ = '0xRoM' from argparse import ArgumentParser, ArgumentTypeError, Action from pyModbusTCP.client import ModbusClient import time import sys import re class style(): BLACK = '\033[30m' RED = '\033[31m' GREEN = '\033[32m' YELLOW = '\033[33m' BLUE = '\033[34m' MAGENTA = '\033[35m' CYAN = '\033[36m' WHITE = '\033[37m' BOLD = '\33[1m' UNDERLINE = '\033[4m' RESET = '\033[0m' class InflateRange(Action): def __call__(self, parser, namespace, values, option_string=None): #print('%r %r %r' % (namespace, values, option_string)) lst = [] for string in values: #print 'in string:',string if '-' in string: m = re.match(r'(\d+)(?:-(\d+))?$', string) # ^ (or use .split('-'). anyway you like.) if not m: raise ArgumentTypeError("'" + string + "' is not a range of number. Expected forms like '0-5' or '2'.") start = m.group(1) end = m.group(2) or start lst.extend(list(range(int(start,10), int(end,10)+1))) else: lst.append(int(string)) setattr(namespace, self.dest, lst) parser = ArgumentParser() parser.add_argument('-co', '--coil', action=InflateRange, nargs='*', required=False, help='list of addresses') parser.add_argument('-ho', '--hold', action=InflateRange, nargs='*', required=False, help='list of addresses') parser.add_argument('-di', '--discrete', action=InflateRange, nargs='*', required=False, help='list of addresses') parser.add_argument('-in', '--input', action=InflateRange, nargs='*', required=False, help='list of addresses') parser.add_argument('-i','--ipaddress', action='store', type=str, required=True, help='Input IP Address') parser.add_argument('-p','--port', action='store', type=int, required=False, help='Port Number', default=502) parser.add_argument('-t','--timeout', action='store', type=float, required=False, help='request every X seconds', default=2) args = parser.parse_args() print("### watch list ###") total = 0 if args.coil: print(style.BOLD+"coils: "+style.RESET+"("+str(len(args.coil))+")"+str(args.coil)) total += len(args.coil) if args.hold: print(style.BOLD+"hold reg: "+style.RESET+"("+str(len(args.hold))+")"+str(args.hold)) total += len(args.hold) if args.discrete: print(style.BOLD+"discrete: "+style.RESET+"("+str(len(args.discrete))+")"+str(args.discrete)) total += len(args.discrete) if args.input: print(style.BOLD+"input reg: "+style.RESET+"("+str(len(args.input))+")"+str(args.input)) total += len(args.input) print(style.BOLD+"Total = "+str(total)+style.RESET) print("------------------") co_orig={} ho_orig={} di_orig={} in_orig={} co_mod={} ho_mod={} di_mod={} in_mod={} co_prev={} ho_prev={} di_prev={} in_prev={} client=ModbusClient(host=args.ipaddress,port=args.port,auto_open=True,auto_close=True,timeout=args.timeout) if args.coil: for i in args.coil: co_orig[i]=client.read_coils(i,1) co_mod[i] = style.RESET co_prev[i] = co_orig[i] if args.hold: for i in args.hold: ho_orig[i]=client.read_holding_registers(i,1) if ho_orig[i] == None: ho_orig[i] = " " ho_mod[i] = style.RESET ho_prev[i] = ho_orig[i] if args.discrete: for i in args.discrete: di_orig[i]=client.read_discrete_inputs(i,1) di_mod[i] = style.RESET di_prev[i] = di_orig[i] if args.input: for i in args.input: in_orig[i]=client.read_input_registers(i,1) if in_orig[i] == None: in_orig[i] = " " in_mod[i] = style.RESET in_prev[i] = in_orig[i] while True: co_new={} ho_new={} di_new={} in_new={} if args.coil: for i in args.coil: co_new[i]=client.read_coils(i,1) if co_new[i] == None: co_new[i] = "0" co_mod[i] = style.CYAN else: if co_new[i][0] == 1 and co_prev[i][0] == 0: co_mod[i] = style.GREEN if co_new[i][0] == 0 and co_prev[i][0] == 1: co_mod[i] = style.RED co_prev[i] = co_new[i] if args.hold: for i in args.hold: ho_new[i]=client.read_holding_registers(i,1) if ho_new[i] == None: ho_new[i] = " " else: if ho_new[i] > ho_prev[i]: ho_mod[i] = style.GREEN if ho_new[i] < ho_prev[i]: ho_mod[i] = style.RED ho_prev[i] = ho_new[i] if args.discrete: for i in args.discrete: di_new[i]=client.read_discrete_inputs(i,1) if di_new[i] == None: di_new[i] = "0" di_mod[i] = style.CYAN else: if di_new[i][0] == 1 and di_prev[i][0] == 0: di_mod[i] = style.GREEN if di_new[i][0] == 0 and di_prev[i][0] == 1: di_mod[i] = style.RED di_prev[i] = di_new[i] if args.input: for i in args.input: in_new[i]=client.read_input_registers(i,1) if in_new[i] == None: in_new[i] = " " else: if in_new[i] > in_prev[i]: in_mod[i] = style.GREEN if in_new[i] < in_prev[i]: in_mod[i] = style.RED in_prev[i] = in_new[i] sys.stdout.flush() output = '\r' if args.coil: output += '['+style.BOLD+'coils:'+style.RESET for i in co_new: output += style.RESET+'['+str(co_mod[i])+str(int(co_new[i][0]))+style.RESET+']' output += ']' if args.hold: output += '['+style.BOLD+'hold regs:'+style.RESET for i in ho_new: output += style.RESET+'['+str(ho_mod[i])+str(ho_new[i][0]).rjust(5, ' ')+style.RESET+']' output += ']' if args.discrete: output += '['+style.BOLD+'discrete:'+style.RESET for i in di_new: output += style.RESET+'['+str(di_mod[i])+str(int(di_new[i][0]))+style.RESET+']' output += ']' if args.input: output += '['+style.BOLD+'input regs:'+style.RESET for i in in_new: output += style.RESET+'['+str(in_mod[i])+str(in_new[i][0]).rjust(5, ' ')+style.RESET+']' output += ']' print output, time.sleep(args.timeout)