Browse Source

Updated version

Stan Janssen 3 years ago
parent
commit
379787d381
3 changed files with 183 additions and 226 deletions
  1. 2 226
      sbus/__init__.py
  2. 172 0
      sbus/instrument.py
  3. 9 0
      sbustest.py

+ 2 - 226
sbus/__init__.py

@@ -1,228 +1,4 @@
-import struct
-import time
-import serial
-from threading import Thread
-
-from .crc import calculate_crc
-
 __version__ = '0.1.0'
 
-# Frame Synchronization
-FS = 0xB5
-
-# Attributes
-AT_REQUEST = 0x00
-AT_RESPONSE = 0x01
-AT_ACK = 0x02
-
-# S-Bus codes
-SC_READ_COUNTER = 0x00
-SC_READ_FLAGS = 0x02
-SC_READ_INPUT = 0x03
-SC_READ_RTC = 0x04
-SC_READ_OUTPUT = 0x05
-SC_READ_REGISTER = 0x06
-SC_READ_TIMER = 0x07
-SC_READ_DATA_BLOCK = 0x96
-SC_WRITE_COUNTER = 0x0A
-SC_WRITE_FLAGS = 0x0B
-SC_WRITE_RTC = 0x0C
-SC_WRITE_OUTPUT = 0x0D
-SC_WRITE_REGISTER = 0x0E
-SC_WRITE_TIMER = 0x0F
-SC_WRITE_DATA_BLOCK = 0x97
-
-REGISTER_SIZE = 4
-MAX_REGISTERS = 10
-REQUEST_SIZE = 9
-
-TIMEOUT = 1.0
-WAIT_BETWEEN_REQUESTS = 0.1
-
-class Instrument:
-    def __init__(self, address, serial_port, timeout=TIMEOUT, **kwargs):
-        self.address = address
-        self.serial = serial.Serial(serial_port, timeout=timeout, **kwargs)
-        self.last_request_time = 0.0
-
-    def read_counter(self, register):
-        raise NotImplementedError("Reading Counters is not yet implemented.")
-
-    def read_flags(self):
-        raise NotImplementedError("Reading Flags is not yet implemented.")
-
-    def read_input(self):
-        raise NotImplementedError("Reading Inputs is not yet implemented.")
-
-    def read_rtc(self):
-        raise NotImplementedError("Reading RTC is not yet implemented.")
-
-    def read_output(self):
-        raise NotImplementedError("Reading Outputs is not yet implemented.")
-
-    def read_register(self, register, num_decimals=0, signed=True):
-        if register > 255:
-            raise ValueError("Maximum value for register is 255")
-        message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, 0, 0, register]
-        response_size = 4 + 4
-        data = self._perform_request(message, response_size=response_size)
-        return self._interpret_result(data, num_decimals, signed)
-
-    def read_multiple_registers(self, start_register, num_registers=1, num_decimals=0, signed=True):
-        if start_register + num_registers > 255:
-            raise ValueError("Cannot read registers above number 255.")
-        remaining_registers = num_registers
-        results = []
-        while remaining_registers > 0:
-            print(f"Requesting from register {start_register}")
-            num_registers = min(remaining_registers, MAX_REGISTERS)
-
-            message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, num_registers - 1, 0, start_register]
-            response_size = 4 + 4 * num_registers
-            data = self._perform_request(message, response_size=response_size)
-            results += self._interpret_result(data, num_decimals, signed, always_list=True)
-
-            remaining_registers -= num_registers
-            start_register += num_registers
-        return results
-
-    def read_register_list(self, registers_list, num_decimals=0, signed=True):
-        pass
-
-    def read_mixed_registers_list(self, registers_list, num_decimals_list, signed_list):
-        pass
-
-    def read_timer(self, timer):
-        raise NotImplementedError("Reading Timers is not yet implemented.")
-
-    def read_data_block(self):
-        raise NotImplementedError("Reading Data Blocks is not yet implemented.")
-
-    def write_counter(self):
-        raise NotImplementedError("Writing Counters is not yet implemented.")
-
-    def write_flags(self):
-        raise NotImplementedError("Writing Flags is not yet implemented.")
-
-    def write_rtc(self):
-        raise NotImplementedError("Writing RTC is not yet implemented.")
-
-    def write_output(self):
-        raise NotImplementedError("Writing Outputs is not yet implemented.")
-
-    def write_register(self):
-        raise NotImplementedError("Writing Registers is not yet implemented.")
-
-    def write_timer(self):
-        raise NotImplementedError("Writing Timers is not yet implemented.")
-
-    def write_data_block(self):
-        raise NotImplementedError("Writing Data Blocks is not yet implemented.")
-
-    def _perform_request(self, message, response_size):
-        self._wait_for_next_request()
-        message += calculate_crc(message)
-        self.serial.write(message)
-        response = self.serial.read(response_size)
-        if not response:
-            raise TimeoutError("No response received in time")
-        self._check_crc(response)
-        return response
-
-    def _wait_for_next_request(self):
-        now = time.time()
-        time_elapsed = now - self.last_request_time
-        time.sleep(max(0, WAIT_BETWEEN_REQUESTS - time_elapsed))
-        self.last_request_time = now
-
-    def _check_crc(self, message):
-        crc = calculate_crc(message[:-2])
-        if crc != message[-2:]:
-            return False
-        return True
-
-    def _interpret_result(self, message, num_decimals=0, signed=True, always_list=False):
-        if message[0] != FS:
-            raise ValueError("Message does not begin with byte 0xB5. Check your baudrate and cable polarity.")
-
-        if message[1] != AT_RESPONSE:
-            return _interpret_error(message)
-
-        register_values = []
-        data = message[2:-2]
-        num_registers = int(len(data)/4)
-        if signed:
-            format_code = 'l'
-        else:
-            format_code = 'L'
-        register_values = [value / 10 ** num_decimals for value in struct.unpack(f'>{num_registers}{format_code}', data)]
-
-        if not always_list and len(register_values) == 1:
-            return register_values[0]
-        return register_values
-
-    def _chunk_data(self, data):
-        for i in range(len(data)>>2):
-            yield data[i*REGISTER_SIZE:(i+1)*REGISTER_SIZE]
-
-
-class DeviceEmulator:
-    def __init__(self, address, serial_port, **kwargs):
-        self.serial = serial.Serial(serial_port, **kwargs)
-        self.registers = {}
-        self.thread = None
-
-    def start(self):
-        self.thread = Thread(target=self._run)
-        self.thread.start()
-
-    def stop(self):
-        self.thread.stop()
-
-    def _run(self):
-        while True:
-            byte = self.serial.read(1)
-            if byte != FS:
-                continue
-            request = self._read_message()
-            status, data = self._handle_message(request)
-            response = [FS, status] + data
-            response += calculate_crc(response)
-            self.serial.write(response)
-
-    def _read_message(self):
-        message = FS + self.serial.read(REQUEST_SIZE - 1)
-        if message[1] != AT_REQUEST or message[2] != self.address:
-            self._skip_message()
-
-    def _check_crc(self, message):
-        crc = calculate_crc(message[:-2])
-        if crc != message[-2:]:
-            raise ValueError("CRC Mismatch")
-
-    def _skip_message(self, size):
-        try:
-            message = self.serial.read(size)
-        except:
-            pass
-
-    def _handle_message(self, message):
-        try:
-            self._check_crc(message)
-        except ValueError:
-            return AT_ERROR, [0,0,0,1]
-
-        if message[3] == SC_READ_REGISTER:
-            num_registers = message[4] + 1
-            start_register = int.from_bytes(message[5:7], 'big')
-            data = bytes()
-            for register in range(start_register, start_register + num_registers):
-                data += self.get_register_value(register)
-            return AT_RESPONSE, data
-
-    def get_register_value(self, register):
-        if register in self.registers:
-            return list(self.registers[register].to_bytes(4, 'big'))
-        else:
-            return list((0).to_bytes(4, 'big'))
-
+from .instrument import Instrument
+from .emulator import Emulator

+ 172 - 0
sbus/instrument.py

@@ -0,0 +1,172 @@
+import struct
+import time
+import serial
+from threading import Thread
+
+from .crc import calculate_crc
+
+# Frame Synchronization
+FS = 0xB5
+ESCAPE_CHAR = 0xC5
+
+# Attributes
+AT_REQUEST = 0x00
+AT_RESPONSE = 0x01
+AT_ACK = 0x02
+
+# S-Bus codes
+SC_READ_COUNTER = 0x00
+SC_READ_FLAGS = 0x02
+SC_READ_INPUT = 0x03
+SC_READ_RTC = 0x04
+SC_READ_OUTPUT = 0x05
+SC_READ_REGISTER = 0x06
+SC_READ_TIMER = 0x07
+SC_READ_DATA_BLOCK = 0x96
+SC_WRITE_COUNTER = 0x0A
+SC_WRITE_FLAGS = 0x0B
+SC_WRITE_RTC = 0x0C
+SC_WRITE_OUTPUT = 0x0D
+SC_WRITE_REGISTER = 0x0E
+SC_WRITE_TIMER = 0x0F
+SC_WRITE_DATA_BLOCK = 0x97
+
+REGISTER_SIZE = 4
+MAX_REGISTERS = 10
+REQUEST_SIZE = 9
+
+TIMEOUT = 1.0
+WAIT_BETWEEN_REQUESTS = 0.1
+
+class Instrument:
+    def __init__(self, address, serial_port, timeout=TIMEOUT, async=False, **kwargs):
+        self.address = address
+        self.serial = serial.Serial(serial_port, timeout=timeout, **kwargs)
+        self.last_request_time = 0.0
+
+    def read_counter(self, register):
+        raise NotImplementedError("Reading Counters is not yet implemented.")
+
+    def read_flags(self):
+        raise NotImplementedError("Reading Flags is not yet implemented.")
+
+    def read_input(self):
+        raise NotImplementedError("Reading Inputs is not yet implemented.")
+
+    def read_rtc(self):
+        raise NotImplementedError("Reading RTC is not yet implemented.")
+
+    def read_output(self):
+        raise NotImplementedError("Reading Outputs is not yet implemented.")
+
+    def read_register(self, register_address, number_of_decimals=0, signed=False):
+        if register > 255:
+            raise ValueError("Maximum value for register is 255")
+        message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, 0, 0, register_address]
+        response_size = 4 + 4
+        data = self._perform_request(message, response_size=response_size)
+        return self._interpret_result(data, number_of_decimals, signed)
+
+    def read_registers(self, register_address, number_of_registers=1, number_of_decimals=0, signed=False):
+        if register_address + number_of_registers > 255:
+            raise ValueError("Cannot read registers above number 255.")
+        remaining_registers = number_of_registers
+        results = []
+        while remaining_registers > 0:
+            number_of_registers = min(remaining_registers, MAX_REGISTERS)
+
+            message = [FS, AT_REQUEST, self.address, SC_READ_REGISTER, number_of_registers - 1, 0, register_address]
+            response_size = 4 + 4 * number_of_registers
+            data = self._perform_request(message, response_size=response_size)
+            results += self._interpret_result(data, number_of_decimals, signed, always_list=True)
+
+            remaining_registers -= number_of_registers
+            start_register += number_of_registers
+        return results
+
+    def read_timer(self, timer):
+        raise NotImplementedError("Reading Timers is not yet implemented.")
+
+    def read_data_block(self):
+        raise NotImplementedError("Reading Data Blocks is not yet implemented.")
+
+    def write_counter(self):
+        raise NotImplementedError("Writing Counters is not yet implemented.")
+
+    def write_flags(self):
+        raise NotImplementedError("Writing Flags is not yet implemented.")
+
+    def write_rtc(self):
+        raise NotImplementedError("Writing RTC is not yet implemented.")
+
+    def write_output(self):
+        raise NotImplementedError("Writing Outputs is not yet implemented.")
+
+    def write_register(self):
+        raise NotImplementedError("Writing Registers is not yet implemented.")
+
+    def write_timer(self):
+        raise NotImplementedError("Writing Timers is not yet implemented.")
+
+    def write_data_block(self):
+        raise NotImplementedError("Writing Data Blocks is not yet implemented.")
+
+    def _perform_request(self, message, response_size):
+        self._wait_for_next_request()
+        message = self._escape_fs_char(message)
+        message += calculate_crc(message)
+        self.serial.write(message)
+        response = self.serial.read(response_size)
+        if not response:
+            raise TimeoutError("No response received in time")
+        self._check_crc(response)
+        return response
+
+    def _escape_fs_char(self, message):
+        if FS in message[1:]:
+            original_message = message
+            message = [original_message[0]]
+            for char in original_message[1:]:
+                message.append(char)
+                if char == FS:
+                    message.append(ESCAPE_CHAR)
+        return message
+
+    def _wait_for_next_request(self):
+        now = time.time()
+        time_elapsed = now - self.last_request_time
+        time.sleep(max(0, WAIT_BETWEEN_REQUESTS - time_elapsed))
+        self.last_request_time = now
+
+    def _check_crc(self, message):
+        crc = calculate_crc(message[:-2])
+        if crc != message[-2:]:
+            return False
+        return True
+
+    def _interpret_result(self, message, number_of_decimals=0, signed=False, always_list=False):
+        if message[0] != FS:
+            raise ValueError("Message does not begin with byte 0xB5. Check your baudrate and cable polarity.")
+
+        if message[1] != AT_RESPONSE:
+            return _interpret_error(message)
+
+        register_values = []
+        data = message[2:-2]
+        number_of_registers = int(len(data)/4)
+        if signed:
+            format_code = 'l'
+        else:
+            format_code = 'L'
+        if number_of_decimals == 0:
+            register_values = [value for value in struct.unpack(f'>{number_of_registers}{format_code}', data)]
+        else:
+            register_values = [value / 10 ** number_of_decimals for value in struct.unpack(f'>{number_of_registers}{format_code}', data)]
+
+        if not always_list and len(register_values) == 1:
+            return register_values[0]
+        return register_values
+
+    def _chunk_data(self, data):
+        for i in range(len(data)>>2):
+            yield data[i*REGISTER_SIZE:(i+1)*REGISTER_SIZE]

+ 9 - 0
sbustest.py

@@ -0,0 +1,9 @@
+import sbus
+import time
+
+inst = sbus.Instrument(slaveaddress=3, serial_port="/dev/ttyUSB0", baudrate=38400)
+result = inst.read_register(20)
+print(result)
+
+result = inst.read_registers(10, number_of_registers=4, signed=True)
+print(result)