Newer
Older
SCADA / modbus / misc / read_all_holding_registers.py
root on 8 May 2022 3 KB playing with modbus day #1
#!/usr/bin/env python

"""
File: read_all_holding_registers.py
Desc: Read all holding registers from a TCP MODBUS Slave
Version: 0.0.2
"""

__author__ = 'rm'

from pymodbus.client.sync import ModbusTcpClient
import argparse
import sys
import collections


class ModbusException(Exception):
    _codes = {
        1:  'ILLEGAL FUNCTION',
        2:  'ILLEGAL DATA ADDRESS',
        3:  'ILLEGAL DATA VALUE',
        4:  'SLAVE DEVICE FAILURE',
        6:  'SLAVE DEVICE BUSY'
    }

    def __init__(self, code):
        self.code = code
        self.message = ModbusException._codes[code] if ModbusException._codes.has_key(code) else 'Unknown Modbus Exception'

    def __str__(self):
        return "Modbus Error. Exception %d: %s" % (self.code, self.message)


def status(msg):
    sys.stderr.write(msg[:-1][:39].ljust(39,' ')+msg[-1:])

def validate_ipv4(s):
    pieces = s.split('.')
    if len(pieces) != 4: return False
    try: return all(0<=int(p)<256 for p in pieces)
    except ValueError: return False

def scan():
    
    parser = argparse.ArgumentParser(description = "Read all holding registries from a TCP MODBUS Slave")
    parser.add_argument("ip", help="IP address of the slave")
    parser.add_argument("-p", "--port", dest="port", help="Modbus Port. Defaults to 502", type=int, metavar="PORT", default=502)
    parser.add_argument("-u", "--uid", dest="uid", help="Modbus Unit ID. Defaults to 1", type=int, metavar="UID", default=1)
    parser.add_argument("-sa", "--start-address", dest="start_address", help="Starting Address for the scanner. Defaults to 1", type=int, metavar="START", default=1)
    parser.add_argument("-ea", "--end-address", dest="end_address", help="Ending Address for the scanner. Defaults to 65535", type=int, metavar="END", default=65535)
    
    args = parser.parse_args()
    
    try:
        ip = args.ip
    except IndexError:
        print "ERROR: No target to scan\n\n"
        parser.print_help()
        exit()

    # ip address format verification
    if not validate_ipv4(ip):
        print "ERROR: IP address is invalid\n\n"
        parser.print_help()
        exit()

    print 'Connecting to %s...' % ip,
    # connect to modbus slave
    client = ModbusTcpClient(ip, args.port)
    client.connect()
    if client.socket == None:
        print "ERROR: Could not connect to %s." %ip
        exit()
    print ' Connected.'

    # TODO add ETA mechanism
    results = {}
    addr = 1
    registers_tested = args.end_address - args.start_address + 1
    if registers_tested == 1:
        hr = client.read_holding_registers(args.start_address, 1, unit=args.uid) # unit value is device id of the slave (UID)
        if hr.function_code == 3: # if we succeed reading stuff
            results[addr] = hr.registers[0]
        # if it fails, hr.function = 131 (0x83), cf modbus doc
    else:
    	for addr in range(args.start_address, args.end_address):
            hr = client.read_holding_registers(addr, 1, unit=args.uid) # unit value is device id of the slave (UID)
            if hr.function_code == 3: # if we succeed reading stuff
                results[addr] = hr.registers[0]
            # if it fails, hr.function = 131 (0x83), cf modbus doc

    client.close()
    print 'Register scanning is finished (%d registers were tried)' % (registers_tested)
    # sorting dict for printing
    ordered_results = collections.OrderedDict(sorted(results.items()))
    for addr, value in ordered_results.iteritems():
        print 'Addr {0} \t{1}'.format(addr,value)

if __name__=="__main__":
    try:
        scan()
    except KeyboardInterrupt:
        status("Ctrl-C happened\n")