instrument.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import struct
  2. import time
  3. import serial
  4. from .crc import calculate_crc
  5. # Frame Synchronization
  6. FS = 0xB5
  7. ESCAPE_CHAR = 0xC5
  8. # Attributes
  9. AT_REQUEST = 0x00
  10. AT_RESPONSE = 0x01
  11. AT_ACK = 0x02
  12. # S-Bus codes
  13. SC_READ_COUNTER = 0x00
  14. SC_READ_FLAGS = 0x02
  15. SC_READ_INPUT = 0x03
  16. SC_READ_RTC = 0x04
  17. SC_READ_OUTPUT = 0x05
  18. SC_READ_REGISTER = 0x06
  19. SC_READ_TIMER = 0x07
  20. SC_READ_DATA_BLOCK = 0x96
  21. SC_WRITE_COUNTER = 0x0A
  22. SC_WRITE_FLAGS = 0x0B
  23. SC_WRITE_RTC = 0x0C
  24. SC_WRITE_OUTPUT = 0x0D
  25. SC_WRITE_REGISTER = 0x0E
  26. SC_WRITE_TIMER = 0x0F
  27. SC_WRITE_DATA_BLOCK = 0x97
  28. REGISTER_SIZE = 4
  29. MAX_REGISTERS = 10
  30. REQUEST_SIZE = 9
  31. TIMEOUT = 1.0
  32. WAIT_BETWEEN_REQUESTS = 0.1
  33. class Instrument:
  34. def __init__(self, address, serial_port, timeout=TIMEOUT, async_=False, **kwargs):
  35. self.address = address
  36. self.serial = serial.Serial(serial_port, timeout=timeout, **kwargs)
  37. self.last_request_time = 0.0
  38. def read_counter(self, register):
  39. raise NotImplementedError("Reading Counters is not yet implemented.")
  40. def read_flags(self):
  41. raise NotImplementedError("Reading Flags is not yet implemented.")
  42. def read_input(self):
  43. raise NotImplementedError("Reading Inputs is not yet implemented.")
  44. def read_rtc(self):
  45. raise NotImplementedError("Reading RTC is not yet implemented.")
  46. def read_output(self):
  47. raise NotImplementedError("Reading Outputs is not yet implemented.")
  48. def read_register(self, register_address, number_of_decimals=0, signed=False):
  49. if register_address > 255:
  50. raise ValueError("Maximum value for register is 255")
  51. message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, 0, 0, register_address]
  52. response_size = 4 + 4
  53. data = self._perform_request(message, response_size=response_size)
  54. return self._interpret_result(data, number_of_decimals, signed)
  55. def read_registers(self, register_address, number_of_registers=1, number_of_decimals=0, signed=False):
  56. if register_address + number_of_registers > 255:
  57. raise ValueError("Cannot read registers above number 255.")
  58. remaining_registers = number_of_registers
  59. results = []
  60. while remaining_registers > 0:
  61. number_of_registers = min(remaining_registers, MAX_REGISTERS)
  62. message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, number_of_registers - 1, 0, register_address]
  63. response_size = 4 + 4 * number_of_registers
  64. data = self._perform_request(message, response_size=response_size)
  65. results += self._interpret_result(data, number_of_decimals, signed, always_list=True)
  66. remaining_registers -= number_of_registers
  67. start_register += number_of_registers
  68. return results
  69. def read_timer(self, timer):
  70. raise NotImplementedError("Reading Timers is not yet implemented.")
  71. def read_data_block(self):
  72. raise NotImplementedError("Reading Data Blocks is not yet implemented.")
  73. def write_counter(self):
  74. raise NotImplementedError("Writing Counters is not yet implemented.")
  75. def write_flags(self):
  76. raise NotImplementedError("Writing Flags is not yet implemented.")
  77. def write_rtc(self):
  78. raise NotImplementedError("Writing RTC is not yet implemented.")
  79. def write_output(self):
  80. raise NotImplementedError("Writing Outputs is not yet implemented.")
  81. def write_register(self):
  82. raise NotImplementedError("Writing Registers is not yet implemented.")
  83. def write_timer(self):
  84. raise NotImplementedError("Writing Timers is not yet implemented.")
  85. def write_data_block(self):
  86. raise NotImplementedError("Writing Data Blocks is not yet implemented.")
  87. def _perform_request(self, message, response_size):
  88. self._wait_for_next_request()
  89. message = self._escape_fs_char(message)
  90. message += calculate_crc(message)
  91. self.serial.write(message)
  92. response = self.serial.read(response_size)
  93. if not response:
  94. raise TimeoutError("No response received in time")
  95. self._check_crc(response)
  96. return response
  97. def _escape_fs_char(self, message):
  98. if FS in message[1:]:
  99. original_message = message
  100. message = [original_message[0]]
  101. for char in original_message[1:]:
  102. message.append(char)
  103. if char == FS:
  104. message.append(ESCAPE_CHAR)
  105. return message
  106. def _wait_for_next_request(self):
  107. now = time.time()
  108. time_elapsed = now - self.last_request_time
  109. time.sleep(max(0, WAIT_BETWEEN_REQUESTS - time_elapsed))
  110. self.last_request_time = now
  111. def _check_crc(self, message):
  112. crc = calculate_crc(message[:-2])
  113. if crc != message[-2:]:
  114. return False
  115. return True
  116. def _interpret_result(self, message, number_of_decimals=0, signed=False, always_list=False):
  117. if message[0] != FS:
  118. raise ValueError("Message does not begin with byte 0xB5. Check your baudrate and cable polarity.")
  119. if message[1] != AT_RESPONSE:
  120. raise ValueError(f"The response code was: {int(message[1])}, expected: {AT_RESPONSE}")
  121. register_values = []
  122. data = message[2:-2]
  123. number_of_registers = int(len(data)/4)
  124. if signed:
  125. format_code = 'l'
  126. else:
  127. format_code = 'L'
  128. if number_of_decimals == 0:
  129. register_values = [value for value in struct.unpack(f'>{number_of_registers}{format_code}', data)]
  130. else:
  131. register_values = [value / 10 ** number_of_decimals for value in struct.unpack(f'>{number_of_registers}{format_code}', data)]
  132. if not always_list and len(register_values) == 1:
  133. return register_values[0]
  134. return register_values
  135. def _chunk_data(self, data):
  136. for i in range(len(data)>>2):
  137. yield data[i*REGISTER_SIZE:(i+1)*REGISTER_SIZE]