123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import struct
- import time
- import serial
- from threading import Thread
- from .crc import calculate_crc
- __version__ = '0.1.0'
- # Frame Synchronization
- FS = 0xB5
- # Attributes
- AT_REQUEST = 0x00
- AT_RESPONSE = 0x01
- AT_ACK = 0x02
- # S-Bus codes
- SC_READ_COUNTER = 0x00
- SC_READ_FLAGS = 0x02
- SC_READ_INPUT = 0x03
- SC_READ_RTC = 0x04
- SC_READ_OUTPUT = 0x05
- SC_READ_REGISTER = 0x06
- SC_READ_TIMER = 0x07
- SC_READ_DATA_BLOCK = 0x96
- SC_WRITE_COUNTER = 0x0A
- SC_WRITE_FLAGS = 0x0B
- SC_WRITE_RTC = 0x0C
- SC_WRITE_OUTPUT = 0x0D
- SC_WRITE_REGISTER = 0x0E
- SC_WRITE_TIMER = 0x0F
- SC_WRITE_DATA_BLOCK = 0x97
- REGISTER_SIZE = 4
- MAX_REGISTERS = 10
- REQUEST_SIZE = 9
- TIMEOUT = 1.0
- WAIT_BETWEEN_REQUESTS = 0.1
- class Instrument:
- def __init__(self, address, serial_port, timeout=TIMEOUT, **kwargs):
- self.address = address
- self.serial = serial.Serial(serial_port, timeout=timeout, **kwargs)
- self.last_request_time = 0.0
- def read_counter(self, register):
- raise NotImplementedError("Reading Counters is not yet implemented.")
- def read_flags(self):
- raise NotImplementedError("Reading Flags is not yet implemented.")
- def read_input(self):
- raise NotImplementedError("Reading Inputs is not yet implemented.")
- def read_rtc(self):
- raise NotImplementedError("Reading RTC is not yet implemented.")
- def read_output(self):
- raise NotImplementedError("Reading Outputs is not yet implemented.")
- def read_register(self, register, num_decimals=0, signed=True):
- if register > 255:
- raise ValueError("Maximum value for register is 255")
- message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, 0, 0, register]
- response_size = 4 + 4
- data = self._perform_request(message, response_size=response_size)
- return self._interpret_result(data, num_decimals, signed)
- def read_multiple_registers(self, start_register, num_registers=1, num_decimals=0, signed=True):
- if start_register + num_registers > 255:
- raise ValueError("Cannot read registers above number 255.")
- remaining_registers = num_registers
- results = []
- while remaining_registers > 0:
- print(f"Requesting from register {start_register}")
- num_registers = min(remaining_registers, MAX_REGISTERS)
- message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, num_registers - 1, 0, start_register]
- response_size = 4 + 4 * num_registers
- data = self._perform_request(message, response_size=response_size)
- results += self._interpret_result(data, num_decimals, signed, always_list=True)
- remaining_registers -= num_registers
- start_register += num_registers
- return results
- def read_register_list(self, registers_list, num_decimals=0, signed=True):
- pass
- def read_mixed_registers_list(self, registers_list, num_decimals_list, signed_list):
- pass
- def read_timer(self, timer):
- raise NotImplementedError("Reading Timers is not yet implemented.")
- def read_data_block(self):
- raise NotImplementedError("Reading Data Blocks is not yet implemented.")
- def write_counter(self):
- raise NotImplementedError("Writing Counters is not yet implemented.")
- def write_flags(self):
- raise NotImplementedError("Writing Flags is not yet implemented.")
- def write_rtc(self):
- raise NotImplementedError("Writing RTC is not yet implemented.")
- def write_output(self):
- raise NotImplementedError("Writing Outputs is not yet implemented.")
- def write_register(self):
- raise NotImplementedError("Writing Registers is not yet implemented.")
- def write_timer(self):
- raise NotImplementedError("Writing Timers is not yet implemented.")
- def write_data_block(self):
- raise NotImplementedError("Writing Data Blocks is not yet implemented.")
- def _perform_request(self, message, response_size):
- self._wait_for_next_request()
- message += calculate_crc(message)
- self.serial.write(message)
- response = self.serial.read(response_size)
- if not response:
- raise TimeoutError("No response received in time")
- self._check_crc(response)
- return response
- def _wait_for_next_request(self):
- now = time.time()
- time_elapsed = now - self.last_request_time
- time.sleep(max(0, WAIT_BETWEEN_REQUESTS - time_elapsed))
- self.last_request_time = now
- def _check_crc(self, message):
- crc = calculate_crc(message[:-2])
- if crc != message[-2:]:
- return False
- return True
- def _interpret_result(self, message, num_decimals=0, signed=True, always_list=False):
- if message[0] != FS:
- raise ValueError("Message does not begin with byte 0xB5. Check your baudrate and cable polarity.")
- if message[1] != AT_RESPONSE:
- return _interpret_error(message)
- register_values = []
- data = message[2:-2]
- num_registers = int(len(data)/4)
- if signed:
- format_code = 'l'
- else:
- format_code = 'L'
- register_values = [value / 10 ** num_decimals for value in struct.unpack(f'>{num_registers}{format_code}', data)]
- if not always_list and len(register_values) == 1:
- return register_values[0]
- return register_values
- def _chunk_data(self, data):
- for i in range(len(data)>>2):
- yield data[i*REGISTER_SIZE:(i+1)*REGISTER_SIZE]
- class DeviceEmulator:
- def __init__(self, address, serial_port, **kwargs):
- self.serial = serial.Serial(serial_port, **kwargs)
- self.registers = {}
- self.thread = None
- def start(self):
- self.thread = Thread(target=self._run)
- self.thread.start()
- def stop(self):
- self.thread.stop()
- def _run(self):
- while True:
- byte = self.serial.read(1)
- if byte != FS:
- continue
- request = self._read_message()
- status, data = self._handle_message(request)
- response = [FS, status] + data
- response += calculate_crc(response)
- self.serial.write(response)
- def _read_message(self):
- message = FS + self.serial.read(REQUEST_SIZE - 1)
- if message[1] != AT_REQUEST or message[2] != self.address:
- self._skip_message()
- def _check_crc(self, message):
- crc = calculate_crc(message[:-2])
- if crc != message[-2:]:
- raise ValueError("CRC Mismatch")
- def _skip_message(self, size):
- try:
- message = self.serial.read(size)
- except:
- pass
- def _handle_message(self, message):
- try:
- self._check_crc(message)
- except ValueError:
- return AT_ERROR, [0,0,0,1]
- if message[3] == SC_READ_REGISTER:
- num_registers = message[4] + 1
- start_register = int.from_bytes(message[5:7], 'big')
- data = bytes()
- for register in range(start_register, start_register + num_registers):
- data += self.get_register_value(register)
- return AT_RESPONSE, data
- def get_register_value(self, register):
- if register in self.registers:
- return list(self.registers[register].to_bytes(4, 'big'))
- else:
- return list((0).to_bytes(4, 'big'))
|