instrument.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import struct
  2. import time
  3. import serial
  4. from threading import Thread
  5. from .crc import calculate_crc
  6. # Frame Synchronization
  7. FS = 0xB5
  8. ESCAPE_CHAR = 0xC5
  9. # Attributes
  10. AT_REQUEST = 0x00
  11. AT_RESPONSE = 0x01
  12. AT_ACK = 0x02
  13. # S-Bus codes
  14. SC_READ_COUNTER = 0x00
  15. SC_READ_FLAGS = 0x02
  16. SC_READ_INPUT = 0x03
  17. SC_READ_RTC = 0x04
  18. SC_READ_OUTPUT = 0x05
  19. SC_READ_REGISTER = 0x06
  20. SC_READ_TIMER = 0x07
  21. SC_READ_DATA_BLOCK = 0x96
  22. SC_WRITE_COUNTER = 0x0A
  23. SC_WRITE_FLAGS = 0x0B
  24. SC_WRITE_RTC = 0x0C
  25. SC_WRITE_OUTPUT = 0x0D
  26. SC_WRITE_REGISTER = 0x0E
  27. SC_WRITE_TIMER = 0x0F
  28. SC_WRITE_DATA_BLOCK = 0x97
  29. REGISTER_SIZE = 4
  30. MAX_REGISTERS = 10
  31. REQUEST_SIZE = 9
  32. TIMEOUT = 1.0
  33. WAIT_BETWEEN_REQUESTS = 0.1
  34. class Instrument:
  35. def __init__(self, address, serial_port, timeout=TIMEOUT, async=False, **kwargs):
  36. self.address = address
  37. self.serial = serial.Serial(serial_port, timeout=timeout, **kwargs)
  38. self.last_request_time = 0.0
  39. def read_counter(self, register):
  40. raise NotImplementedError("Reading Counters is not yet implemented.")
  41. def read_flags(self):
  42. raise NotImplementedError("Reading Flags is not yet implemented.")
  43. def read_input(self):
  44. raise NotImplementedError("Reading Inputs is not yet implemented.")
  45. def read_rtc(self):
  46. raise NotImplementedError("Reading RTC is not yet implemented.")
  47. def read_output(self):
  48. raise NotImplementedError("Reading Outputs is not yet implemented.")
  49. def read_register(self, register_address, number_of_decimals=0, signed=False):
  50. if register > 255:
  51. raise ValueError("Maximum value for register is 255")
  52. message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, 0, 0, register_address]
  53. response_size = 4 + 4
  54. data = self._perform_request(message, response_size=response_size)
  55. return self._interpret_result(data, number_of_decimals, signed)
  56. def read_registers(self, register_address, number_of_registers=1, number_of_decimals=0, signed=False):
  57. if register_address + number_of_registers > 255:
  58. raise ValueError("Cannot read registers above number 255.")
  59. remaining_registers = number_of_registers
  60. results = []
  61. while remaining_registers > 0:
  62. number_of_registers = min(remaining_registers, MAX_REGISTERS)
  63. message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, number_of_registers - 1, 0, register_address]
  64. response_size = 4 + 4 * number_of_registers
  65. data = self._perform_request(message, response_size=response_size)
  66. results += self._interpret_result(data, number_of_decimals, signed, always_list=True)
  67. remaining_registers -= number_of_registers
  68. start_register += number_of_registers
  69. return results
  70. def read_timer(self, timer):
  71. raise NotImplementedError("Reading Timers is not yet implemented.")
  72. def read_data_block(self):
  73. raise NotImplementedError("Reading Data Blocks is not yet implemented.")
  74. def write_counter(self):
  75. raise NotImplementedError("Writing Counters is not yet implemented.")
  76. def write_flags(self):
  77. raise NotImplementedError("Writing Flags is not yet implemented.")
  78. def write_rtc(self):
  79. raise NotImplementedError("Writing RTC is not yet implemented.")
  80. def write_output(self):
  81. raise NotImplementedError("Writing Outputs is not yet implemented.")
  82. def write_register(self):
  83. raise NotImplementedError("Writing Registers is not yet implemented.")
  84. def write_timer(self):
  85. raise NotImplementedError("Writing Timers is not yet implemented.")
  86. def write_data_block(self):
  87. raise NotImplementedError("Writing Data Blocks is not yet implemented.")
  88. def _perform_request(self, message, response_size):
  89. self._wait_for_next_request()
  90. message = self._escape_fs_char(message)
  91. message += calculate_crc(message)
  92. self.serial.write(message)
  93. response = self.serial.read(response_size)
  94. if not response:
  95. raise TimeoutError("No response received in time")
  96. self._check_crc(response)
  97. return response
  98. def _escape_fs_char(self, message):
  99. if FS in message[1:]:
  100. original_message = message
  101. message = [original_message[0]]
  102. for char in original_message[1:]:
  103. message.append(char)
  104. if char == FS:
  105. message.append(ESCAPE_CHAR)
  106. return message
  107. def _wait_for_next_request(self):
  108. now = time.time()
  109. time_elapsed = now - self.last_request_time
  110. time.sleep(max(0, WAIT_BETWEEN_REQUESTS - time_elapsed))
  111. self.last_request_time = now
  112. def _check_crc(self, message):
  113. crc = calculate_crc(message[:-2])
  114. if crc != message[-2:]:
  115. return False
  116. return True
  117. def _interpret_result(self, message, number_of_decimals=0, signed=False, always_list=False):
  118. if message[0] != FS:
  119. raise ValueError("Message does not begin with byte 0xB5. Check your baudrate and cable polarity.")
  120. if message[1] != AT_RESPONSE:
  121. return _interpret_error(message)
  122. register_values = []
  123. data = message[2:-2]
  124. number_of_registers = int(len(data)/4)
  125. if signed:
  126. format_code = 'l'
  127. else:
  128. format_code = 'L'
  129. if number_of_decimals == 0:
  130. register_values = [value for value in struct.unpack(f'>{number_of_registers}{format_code}', data)]
  131. else:
  132. register_values = [value / 10 ** number_of_decimals for value in struct.unpack(f'>{number_of_registers}{format_code}', data)]
  133. if not always_list and len(register_values) == 1:
  134. return register_values[0]
  135. return register_values
  136. def _chunk_data(self, data):
  137. for i in range(len(data)>>2):
  138. yield data[i*REGISTER_SIZE:(i+1)*REGISTER_SIZE]