Newer
Older
SCADA / modbus / misc / mocli.py
root on 8 May 2022 5 KB playing with modbus day #1
#!/usr/bin/env python

import argparse
import functools

'''
Command Line Wrapper around pymodbus

Known Issues:
  Writing Integer Values: if the first value in a --values input is a negative 
  value, the parser interprets it incorrectly as an option flag. This can be 
  avoided if the space is removed between the -v or --values flag and the 
  values that follow.
'''



def stringBool(val):
  uval = val.upper()
  if uval in ['1','T','TRUE']:
    return True
  elif uval in ['0','F','FALSE']:
    return False
  else:
    return None

def arrayStringBools(valarr):
  result = [stringBool(item) for item in valarr]
  return result

# String to Int works as expected except for when value is out of signed or 
# unsigned Modbus Word (2 Bytes) range. In this case it will return None. If a 
# negative number is provided, it is converted to the equivalent unsigned Word 
# value silently.
def stringToInt(val):
  intval = int(val)
  if intval < -0xFFFF or intval > 0xFFFF:
    return None
  elif intval < 0:
    return 0x10000 + intval
  else:
    return intval 


def arrayStringInts(valarr):
  return [stringToInt(item) for item in valarr]

parser = argparse.ArgumentParser(prog='ReadHTcp')

parser.add_argument('method', choices=['tcp','udp','rtu','serial'], default='tcp')
parser.add_argument('-a','--address',type=int)
parser.add_argument('-c','--count',type=int,default=1)
parser.add_argument('-v','--values',type=lambda s: [item for item in list(map(lambda t: t.strip(),s.split(',')))])

parser.add_argument('-i','--ip',default='127.0.0.1')
parser.add_argument('-p','--port',default=502,type=int)
parser.add_argument('-u','--unit',type=int,default='0')
# Arguments for Serial Clients
# timeout is in seconds
parser.add_argument('-t', '-timeout',type=int,default=3)
parser.add_argument('--stopbits',choices=[0,1,2],default=1)
parser.add_argument('--bytesize',choices=[5,6,7,8],default=8)
parser.add_argument('--parity',choices=['N','E','O'])
parser.add_argument('--baudrate',type=int,default=9600)

# Datastore Arguments
parser.add_argument('--zeromode',action='store_true')

parser.add_argument('-r','--repeat',type=int,default=1)
parser.add_argument('-f','--function',choices=['rcs','wcs','wc','rds','rhs','whs','wh','ris','rwh','mwh','read-coils','write-coils','write_coil','read-discretes','read-holdings','write-holdings','write-holding','read-inputs','read-write-registers','mask-write-register'])

parser.add_argument('-d','--debug',action='store_true')

kargs, uargs = parser.parse_known_args()

duargs = {}

for s in uargs:
  key,val = s.split('=')
  duargs[key]=val

if kargs.debug:
  print('dict',kargs.__dict__)
  print('debug', kargs.debug)
  print('Known: ', kargs)
  print('Unknown: ',uargs)
  print('Unknown Dict: ',duargs)
  quit()

client = None

try:
  if kargs.method == 'tcp':
    from pymodbus.client.sync import ModbusTcpClient

    client = ModbusTcpClient(kargs.ip,port=kargs.port,unit_id=kargs.unit)

  elif kargs.method == 'rtu':
    from pymodbus.client.sync import ModbusSeriaClient

    client = ModbusRtuClient(kargs.method, port=kargs.port, stopbits=kargs.stopbits,bytesize=kargs.bytesize,parity=kargs.parity,baudrate=kargs.baudrate,timeout=kargs.timeout)

  if client != None:
    display_prefix = ''
    function_result = None
    write_result = None
    for x in range(kargs.repeat):
      if kargs.function in ['rcs','read-coils']:
        read = client.read_coils(kargs.address,kargs.count,unit=kargs.unit)
        function_result=read.bits
        display_prefix = 'Read Coils'

      elif kargs.function in ['rds','read-discretes']:
        read = client.read_discrete_inputs(kargs.address,kargs.count,unit=kargs.unit)
        function_result=read.bits
        display_prefix = 'Read Discretes'

      elif kargs.function in ['ris','read-inputs']:
        read = client.read_input_registers(kargs.address,kargs.count,unit=kargs.unit)
        function_result=read.registers
        display_prefix = 'Read Input Registers'

      elif kargs.function in ['rhs','read-holding']:
        read = client.read_holding_registers(kargs.address,kargs.count,unit=kargs.unit)
        function_result = read.registers
        display_prefix = 'Read Holding'

      elif kargs.function in ['wc','write-coil']:
        result = client.write_coil(kargs.address,stringBool(kargs.values[0]),unit=kargs.unit)
        write_result = result
        display_prefix = 'Write Coil'

      elif kargs.function in ['wcs','write-coils']:
        coil_values = arrayStringBools(kargs.values)
        result = client.write_coils(kargs.address,coil_values,unit=kargs.unit)
        write_result = result
        display_prefix = 'Write Coils'

      elif kargs.function in ['wh','write-holding']:
        result = client.write_register(kargs.address,stringToInt(kargs.values[0]),unit=kargs.unit)
        write_result = result
        display_prefix = 'Write Holding Register'

      elif kargs.function in ['whs','write-holdings']:
#        print('-------> Values: ' +str(arrayStringInts(kargs.values)))
        result = client.write_registers(kargs.address,arrayStringInts(kargs.values),unit=kargs.unit)
        write_result = result
        display_prefix = 'Write Holding Registers'

      else:
        print('Function "%s" is not yet implemented in this wrapper. Exiting' % kargs.function)
        quit() 

# Display results
      if function_result != None:
        print(display_prefix + ' #%s' % x,functools.reduce(lambda x,y: str(x)+ ',' + str(y),function_result[:kargs.count]))

      if write_result != None:
        print(display_prefix + ' #%s' % x, write_result)

finally:
  if client != None:
    client.close()