Newer
Older
SCADA / modbus / monitor_muiltiple.py
root on 8 May 2022 6 KB playing with modbus day #1
#!/usr/bin/env python

"""
File: monitor_multiple.py
Desc: Read all registers from a TCP MODBUS Slave
"""
__author__ = '0xRoM'

from argparse import ArgumentParser, ArgumentTypeError, Action
from pyModbusTCP.client import ModbusClient
import time
import sys
import re

class style():
    BLACK = '\033[30m'
    RED = '\033[31m'
    GREEN = '\033[32m'
    YELLOW = '\033[33m'
    BLUE = '\033[34m'
    MAGENTA = '\033[35m'
    CYAN = '\033[36m'
    WHITE = '\033[37m'
    BOLD = '\33[1m'
    UNDERLINE = '\033[4m'
    RESET = '\033[0m'

class InflateRange(Action):
    def __call__(self, parser, namespace, values, option_string=None):
        #print('%r %r %r' % (namespace, values, option_string))
        lst = []
        for string in values:
            #print 'in string:',string
            if '-' in string:
                m = re.match(r'(\d+)(?:-(\d+))?$', string)
                # ^ (or use .split('-'). anyway you like.)
                if not m:
                    raise ArgumentTypeError("'" + string + "' is not a range of number. Expected forms like '0-5' or '2'.")
                start = m.group(1)
                end = m.group(2) or start
                lst.extend(list(range(int(start,10), int(end,10)+1)))
            else:
                 lst.append(int(string))
            setattr(namespace, self.dest, lst)

parser = ArgumentParser()
parser.add_argument('-co', '--coil', action=InflateRange, nargs='*', required=False,  help='list of addresses')
parser.add_argument('-ho', '--hold', action=InflateRange, nargs='*', required=False, help='list of addresses')
parser.add_argument('-di', '--discrete', action=InflateRange, nargs='*', required=False, help='list of addresses')
parser.add_argument('-in', '--input', action=InflateRange, nargs='*', required=False, help='list of addresses')
parser.add_argument('-i','--ipaddress', action='store', type=str, required=True, help='Input IP Address')
parser.add_argument('-p','--port', action='store', type=int, required=False, help='Port Number', default=502)
parser.add_argument('-t','--timeout', action='store', type=float, required=False, help='request every X seconds', default=2)

args = parser.parse_args()
print("### watch list ###")

total = 0
if args.coil:
    print(style.BOLD+"coils: "+style.RESET+"("+str(len(args.coil))+")"+str(args.coil))
    total += len(args.coil)
if args.hold:
    print(style.BOLD+"hold reg: "+style.RESET+"("+str(len(args.hold))+")"+str(args.hold))
    total += len(args.hold)
if args.discrete:
    print(style.BOLD+"discrete: "+style.RESET+"("+str(len(args.discrete))+")"+str(args.discrete))
    total += len(args.discrete)
if args.input:
    print(style.BOLD+"input reg: "+style.RESET+"("+str(len(args.input))+")"+str(args.input))
    total += len(args.input)
print(style.BOLD+"Total = "+str(total)+style.RESET)
print("------------------")

co_orig={}
ho_orig={}
di_orig={}
in_orig={}
co_mod={}
ho_mod={}
di_mod={}
in_mod={}
co_prev={}
ho_prev={}
di_prev={}
in_prev={}
client=ModbusClient(host=args.ipaddress,port=args.port,auto_open=True,auto_close=True,timeout=args.timeout)

if args.coil:
    for i in args.coil:
        co_orig[i]=client.read_coils(i,1)
        co_mod[i] = style.RESET
        co_prev[i] = co_orig[i]
if args.hold:
    for i in args.hold:
        ho_orig[i]=client.read_holding_registers(i,1)
        if ho_orig[i] == None:
            ho_orig[i] =  " "
        ho_mod[i] = style.RESET
        ho_prev[i] = ho_orig[i]
if args.discrete:
    for i in args.discrete:
        di_orig[i]=client.read_discrete_inputs(i,1)
        di_mod[i] = style.RESET
        di_prev[i] = di_orig[i]
if args.input:
    for i in args.input:
        in_orig[i]=client.read_input_registers(i,1)
        if in_orig[i] == None:
            in_orig[i] =  " "
        in_mod[i] = style.RESET
        in_prev[i] = in_orig[i]

while True:
    co_new={}
    ho_new={}
    di_new={}
    in_new={}
    if args.coil:
        for i in args.coil:
            co_new[i]=client.read_coils(i,1)
            if co_new[i] == None:
                co_new[i] =  "0"
                co_mod[i] = style.CYAN
            else:
                if co_new[i][0] == 1 and co_prev[i][0] == 0:
                    co_mod[i] = style.GREEN
                if co_new[i][0] == 0 and co_prev[i][0] == 1:
                    co_mod[i] = style.RED
                co_prev[i] = co_new[i]

    if args.hold:
        for i in args.hold:
            ho_new[i]=client.read_holding_registers(i,1)
            if ho_new[i] == None:
                ho_new[i] =  " "
            else:
                if ho_new[i] > ho_prev[i]:
                    ho_mod[i] = style.GREEN
                if ho_new[i] < ho_prev[i]:
                    ho_mod[i] = style.RED
            ho_prev[i] = ho_new[i]

    if args.discrete:
        for i in args.discrete:
            di_new[i]=client.read_discrete_inputs(i,1)
            if di_new[i] == None:
                di_new[i] =  "0"
                di_mod[i] = style.CYAN
            else:
                if di_new[i][0] == 1 and di_prev[i][0] == 0:
                    di_mod[i] = style.GREEN
                if di_new[i][0] == 0 and di_prev[i][0] == 1:
                    di_mod[i] = style.RED
                di_prev[i] = di_new[i]

    if args.input:
        for i in args.input:
            in_new[i]=client.read_input_registers(i,1)
            if in_new[i] == None:
                in_new[i] =  " "
            else:
                if in_new[i] > in_prev[i]:
                    in_mod[i] = style.GREEN
                if in_new[i] < in_prev[i]:
                    in_mod[i] = style.RED
            in_prev[i] = in_new[i]

    sys.stdout.flush()
    output = '\r'
    if args.coil:
        output += '['+style.BOLD+'coils:'+style.RESET
        for i in co_new:
            output += style.RESET+'['+str(co_mod[i])+str(int(co_new[i][0]))+style.RESET+']'
        output += ']'
    if args.hold:
        output += '['+style.BOLD+'hold regs:'+style.RESET
        for i in ho_new:
            output += style.RESET+'['+str(ho_mod[i])+str(ho_new[i][0]).rjust(5, ' ')+style.RESET+']'
        output += ']'
    if args.discrete:
        output += '['+style.BOLD+'discrete:'+style.RESET
        for i in di_new:
            output += style.RESET+'['+str(di_mod[i])+str(int(di_new[i][0]))+style.RESET+']'
        output += ']'
    if args.input:
        output += '['+style.BOLD+'input regs:'+style.RESET
        for i in in_new:
            output += style.RESET+'['+str(in_mod[i])+str(in_new[i][0]).rjust(5, ' ')+style.RESET+']'
        output += ']'

    print output,
    time.sleep(args.timeout)