an1310.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. #!/usr/local/bin/python3
  2. """
  3. AN1310: A Python bootloader interface for the Microchip AN1310 Bootloader.
  4. Usage:
  5. from an1310 import AN1310Device
  6. dev = AN1310Device("/dev/ttyUSB0")
  7. dev.enter_bootloader()
  8. dev.read_flash_memory(0, 128)
  9. dev.run_application()
  10. """
  11. import argparse
  12. from ctypes import c_ushort
  13. import math
  14. import struct
  15. import serial
  16. import sqlite3
  17. import sys
  18. import time
  19. from PyCRC.CRCCCITT import CRCCCITT
  20. from intelhex import IntelHex
  21. __author__ = 'Stan Janssen'
  22. __url__ = 'https://gitlab.finetuned.nl/bootloader/an1310'
  23. __license__ = 'Public Domain'
  24. __version__ = '1.0'
  25. __status__ = 'Public'
  26. """ Default Values """
  27. BAUDRATE = 115200
  28. PARITY = serial.PARITY_NONE
  29. BYTESIZE = 8
  30. STOPBITS = 1
  31. TIMEOUT = 0.05
  32. STX = 0x0F # Start of TeXt
  33. ETX = 0x04 # End of TeXt
  34. DLE = 0x05 # Data Link Escape
  35. class AN1310Device():
  36. def __init__(self, port):
  37. self.serial = serial.Serial(port=port, baudrate=BAUDRATE,
  38. parity=PARITY, bytesize=BYTESIZE,
  39. stopbits=STOPBITS, timeout=TIMEOUT)
  40. """ Below are the general AN1310 bootloader commands """
  41. def enter_bootloader(self):
  42. """ Generate a serial BREAK state to enter bootloader mode """
  43. print("Sending Break")
  44. self.serial.read(self.serial.in_waiting) # flush existing serial buffer
  45. self.serial.send_break(duration=0.1) # send break so the PIC goes into bootloader mode
  46. entered = False
  47. n = 1
  48. while entered is False:
  49. print("Performing autobaud attempt %s" % n)
  50. n = n + 1
  51. self.serial.write(bytes([STX]))
  52. if self.serial.read(1) == b'\x0f':
  53. entered = True
  54. print("Successfully entered the bootloader")
  55. self.read_bootloader_info()
  56. self._get_device_info()
  57. def read_bootloader_info(self):
  58. message = [0x00]
  59. bootloader_info = self._perform_operation(message)
  60. print(bootloader_info)
  61. self.bootloader_size = struct.unpack("<H", bytes(bootloader_info[0:2]))[0]
  62. self.bootloader_version = struct.unpack("<H", bytes(bootloader_info[2:4]))[0]
  63. command_mask_l = bootloader_info[5] & 0x0F # Get least significant nibble
  64. if command_mask_l == 2:
  65. self.pic_type = "PIC16"
  66. elif command_mask_l == 4:
  67. self.pic_type = "PIC18"
  68. if self.pic_type == "PIC16":
  69. command_mask_h = bootloader_info[4]
  70. if command_mask_h == 0x01:
  71. self.erase_before_write = False
  72. else:
  73. self.erase_before_write = True
  74. self.bootloader_start_address = struct.unpack("<H", bytes(bootloader_info[6:8]))[0]
  75. if self.pic_type == "PIC16":
  76. self.deviceid = bootloader_info[10:12]
  77. return bootloader_info
  78. def read_flash_memory(self, address, bytes):
  79. message = [0x01] + self._little_endian(address, "I") + self._little_endian(bytes)
  80. return self._perform_operation(message)
  81. def read_flash_memory_crc(self, address, pages):
  82. message = [0x02] + self._little_endian(address, "I") + self._little_endian(pages)
  83. return self._perform_operation(message, crc=False)
  84. def erase_flash_memory(self, page, pages):
  85. """ Erases a number of pages of flash memory. """
  86. address = (page + pages) * self.pagesize - 1
  87. message = [0x03] + self._little_endian(address, "I") + [pages]
  88. return self._perform_operation(message)
  89. def write_flash_memory(self, page, pages, data):
  90. address = page * self.pagesize
  91. message = [0x04] + self._little_endian(address, "I") + [pages] + data
  92. return self._perform_operation(message)
  93. def read_eeprom(self, address, bytes):
  94. message = [0x05] + self._little_endian(address) + [0x00] + [0x00] + self._little_endian(bytes)
  95. return self._perform_operation(message)
  96. def write_eeprom(self, address, bytes, data):
  97. message = [0x06] + self._little_endian(address) + [0x00] + [0x00] + self._little_endian(bytes) + data
  98. return self._perform_operation(message)
  99. def write_config_fuses(self, address, bytes, data):
  100. message = [0x07] + self._little_endian(address) + [0x00] + [0x00] + [bytes] + data
  101. return self._perform_operation(message)
  102. def run_application(self):
  103. """ Instruct the bootloader to jump to the application firmware """
  104. message = [0x08]
  105. self._perform_operation(message, response=False)
  106. """ Below are some convenience methods for the SmartEVSE """
  107. def print_cli_menu(self):
  108. self.serial.write(b'\n')
  109. time.sleep(0.05)
  110. print(self.serial.read(self.serial.in_waiting).decode())
  111. """ Below are methods to verify and program a provided HEX file """
  112. # def verify_actual_file(self, file):
  113. # data = IntelHex(file)
  114. #
  115. # for segment in data.segments():
  116. # chip_data = self.read_flash_memory(segment[0], segment[1] - segment[0])
  117. #
  118. # file_data = []
  119. # for i in range(*segment):
  120. # file_data.append(data[i])
  121. #
  122. # if file_data != chip_data:
  123. # return False
  124. # else:
  125. # print("Segment %s - %s OK" % segment)
  126. # return True
  127. #
  128. # def verify_raw_file(self, file):
  129. # data = IntelHex(file)
  130. #
  131. # """ Read data from the chip """
  132. # for segment in data.segments():
  133. # if segment[1] <= self.flashbytes:
  134. # range_start = math.floor(segment[0] / self.pagesize)
  135. # range_end = math.ceil(segment[1] / self.pagesize)
  136. # range_size = range_end - range_start
  137. # address_start = range_start * self.pagesize
  138. # address_end = range_end * self.pagesize
  139. #
  140. # chip_crcs = self.read_flash_memory_crc(address_start, range_size)
  141. # file_crcs = self._file_crc(data, (address_start, address_end))
  142. # if chip_crcs != file_crcs:
  143. # print("Chunk %s - %s Not OK..." % (address_start, address_end))
  144. # print("Chip CRCS were:")
  145. # print(chip_crcs)
  146. # print("File CRCS were:")
  147. # print(file_crcs)
  148. # return False
  149. # else:
  150. # print("Chunk %s - %s OK..." % (address_start, address_end))
  151. # return True
  152. #
  153. # def verify_file(self, file):
  154. # """ Verifies the file that was programmed using program_file() """
  155. # data = IntelHex(file)
  156. #
  157. # segment = data.segments()[0]
  158. # first_page = self._page(segment[0])
  159. # last_page = self._page(min(segment[0], self.bootloader_start_address))
  160. def program_raw_file(self, file):
  161. """ Programs the contents of the provided hex file straight to the PIC. Does not relocate the reset vectors. May overwrite the bootloader and leave the device in an unusable state. """
  162. data = IntelHex(file)
  163. segment = data.segments()[0]
  164. first_page = self._page(segment[0])
  165. last_page = self._page(segment[1])
  166. for page in range(first_page, last_page + 1):
  167. write_data = []
  168. for j in range(0, self.pagesize):
  169. write_data.append(data[page * self.pagesize + j])
  170. new_crc = self._crc(write_data)
  171. prev_crc = self.read_flash_memory_crc(page * self.pagesize, 1)
  172. if prev_crc == new_crc:
  173. print("Nothing to write in page %s, already OK" % page)
  174. else:
  175. print("Erasing page %s" % page)
  176. self.erase_flash_memory(page, 1)
  177. print("Writing page %s" % page)
  178. self.write_flash_memory(page, 1, write_data)
  179. print("Verifying page %s" % page)
  180. written_crc = self.read_flash_memory_crc(page * self.pagesize, 1)
  181. if written_crc == new_crc:
  182. print("Verified OK")
  183. else:
  184. print("Verified Not OK")
  185. def program_file(self, file):
  186. """ Program the provided program, relocating the reset vector in the process. """
  187. write_size = 16
  188. data = IntelHex(file)
  189. # Get the first two bytes and verify that it is a GOTO statement
  190. if data[1] != 0xEF:
  191. raise HEXFileError("No goto-statement in first byte")
  192. else:
  193. application_vector = [data[0], data[1], data[2], data[3]]
  194. segment = data.segments()[0]
  195. first_page = self._page(segment[0])
  196. last_page = self._page(min(segment[1], self.bootloader_start_address))
  197. print("Programming...")
  198. st = time.time()
  199. for page in range(first_page, last_page, write_size): # Does not include last_page
  200. write_data = []
  201. for j in range(0, self.pagesize * write_size):
  202. write_data.append(data[page * self.pagesize + j])
  203. if page == 0:
  204. write_data[0:4] = self._existing_bootloader_pointer()
  205. existing_crc = self.read_flash_memory_crc(page * self.pagesize, write_size)
  206. intended_crc = self._chunked_crc(write_data)
  207. if existing_crc != intended_crc:
  208. print("Writing pages %s through %s" % (page, page + write_size - 1))
  209. if self.erase_before_write:
  210. self.erase_flash_memory(page, write_size)
  211. self.write_flash_memory(page, write_size, write_data)
  212. print("Programming complete (%s seconds)" % (time.time() - st))
  213. # Update the application return vector
  214. app_vector_page = self._page(self.bootloader_start_address) - 1
  215. print("Relocating the application vector...")
  216. # Update that page:
  217. app_vector_page_content = self.read_flash_memory(app_vector_page * self.pagesize, self.pagesize)
  218. if app_vector_page_content[-4:] != application_vector:
  219. app_vector_page_content[-4:] = application_vector
  220. print("Erasing page %s", app_vector_page)
  221. self.erase_flash_memory(app_vector_page, 1)
  222. print("Writing the following to page %s: %s" %(app_vector_page, app_vector_page_content))
  223. self.write_flash_memory(app_vector_page, 1, app_vector_page_content)
  224. else:
  225. print("Application vector already in place.")
  226. def _existing_bootloader_pointer(self):
  227. return self.read_flash_memory(0, 4)
  228. def _file_data(self, hexfile, segment):
  229. data = []
  230. for i in range(*segment):
  231. data.append(hexfile[i])
  232. return data
  233. def _file_crc(self, hexfile, segment):
  234. data = self._file_data(hexfile, segment)
  235. crcs = []
  236. segment_size = segment[1] - segment[0]
  237. for i in range(0, segment_size, self.pagesize):
  238. crcs.extend(self._crc(data[i:i+self.pagesize], start=crcs[-2:]))
  239. return crcs
  240. """ Below are some internal functions that deal with lower level things """
  241. def _perform_operation(self, data, response=True, crc=True):
  242. """ Packages the request in the standard format and sends it to the device"""
  243. request = bytes([STX] + self._escape_control_chars(data) + self._escaped_crc(data) + [ETX])
  244. self.serial.write(request)
  245. if response:
  246. return self._read_response(crc=crc)
  247. def _read_response(self, crc=True):
  248. """ Read the response, un-escaping all control characters, and return the bytestring """
  249. escape = False
  250. message = []
  251. while True:
  252. if self.serial.in_waiting == 0:
  253. time.sleep(10/self.serial.baudrate) # Wait for the next character to be buffered
  254. continue
  255. char = ord(self.serial.read(1)) # Read the next char and convert to integer
  256. if char == DLE and escape is False:
  257. escape = True
  258. elif char == ETX and escape is False: # We've reached the end of the transmission
  259. message.append(char)
  260. break
  261. else:
  262. escape = False
  263. message.append(char)
  264. if self.serial.in_waiting > 0:
  265. print("Warning: additional bytes left in buffer that were not read:")
  266. print(self.serial.read(self.serial.in_waiting))
  267. if crc:
  268. if self._verify_crc(message) is False:
  269. print("Warning: CRC check failed")
  270. return message[1:-3]
  271. else:
  272. return message[1:-1] # The read_flash_crc operation does not include a CRC in the response
  273. def _chunked_crc(self, data):
  274. crc = []
  275. for d in range(0, len(data), self.pagesize):
  276. if d == 0:
  277. crc = self._crc(data[d:d+self.pagesize])
  278. else:
  279. crc = crc + self._crc(data[d:d+self.pagesize], start=crc[-2:])
  280. return crc
  281. def _crc(self, data, start=[0, 0]):
  282. """ Calculate the CRC and return as two little-endian numbers """
  283. """ Optionally provide a previous CRC to build on """
  284. data_as_bytes = bytes(data)
  285. crc_ccitt_tab = [0x0, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, 0x1231, 0x210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, 0x2462, 0x3443, 0x420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, 0x3653, 0x2672, 0x1611, 0x630, 0x76d7, 0x66f6, 0x5695, 0x46b4, 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x840, 0x1861, 0x2802, 0x3823, 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0xa50, 0x3a33, 0x2a12, 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0xc60, 0x1c41, 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0xe70, 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, 0x1080, 0xa1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, 0x2b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, 0x34e2, 0x24c3, 0x14a0, 0x481, 0x7466, 0x6447, 0x5424, 0x4405, 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, 0x26d3, 0x36f2, 0x691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x8e1, 0x3882, 0x28a3, 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0xaf1, 0x1ad0, 0x2ab3, 0x3a92, 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0xcc1, 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0xed1, 0x1ef0]
  286. if len(start) == 2:
  287. crc = struct.unpack("<H", bytes(start))[0]
  288. else:
  289. crc = 0
  290. for d in data:
  291. tmp = (c_ushort(crc >> 8).value) ^ d
  292. crc = (c_ushort(crc << 8).value) ^ crc_ccitt_tab[tmp]
  293. return self._little_endian(crc)
  294. def _escaped_crc(self, data):
  295. """ Return escaped CRC """
  296. return self._escape_control_chars(self._crc(data))
  297. def _verify_crc(self, message):
  298. """ Verify the received CRC for this message """
  299. crc = message[-3:-1]
  300. data = message[1:-3]
  301. calc_crc = CRCCCITT().calculate(bytes(data))
  302. if self._little_endian(calc_crc) == crc:
  303. return True
  304. else:
  305. return False
  306. def _escape_control_chars(self, data):
  307. """ Escape all control characters in the data, prepending DLE """
  308. escaped_string = []
  309. for byte in data:
  310. if byte in [STX, ETX, DLE]:
  311. escaped_string = escaped_string + [DLE]
  312. escaped_string = escaped_string + [byte]
  313. return escaped_string
  314. def _little_endian(self, value, format="H"):
  315. """ Return a little-endian list representation of the provided value """
  316. return list(struct.pack("<" + format, value))
  317. def _load_eeprom_data(self, hexfile):
  318. pass
  319. def _page(self, address):
  320. return int(address / self.pagesize)
  321. def _get_device_info(self):
  322. if self.pic_type == "PIC18":
  323. deviceid = struct.unpack("<H", bytes(self.read_flash_memory(0x3FFFFE, 2)))[0] >> 5
  324. print("The device ID is %s" % deviceid)
  325. devicedb = sqlite3.connect("devices.db")
  326. cur = devicedb.cursor()
  327. cur.execute("SELECT `ENDFLASH` - `STARTFLASH`, `WRITEFLASHBLOCKSIZE`, `ERASEFLASHBLOCKSIZE`, `STARTEE`, `ENDEE` - `STARTEE`, `ENDGPR` - `STARTGPR` FROM `devices` WHERE `DEVICEID` = ?", (deviceid,))
  328. result = cur.fetchall()
  329. if len(result) > 0:
  330. self.flashbytes, self.pagesize, self.erase_pagesize, self.eeprom_start_address, self.eeprom_size, self.ram_size = result[0]
  331. else:
  332. raise Exception("Device ID %s was not found in devices.db" % deviceid)
  333. cur.close()
  334. class HEXFileError(Exception):
  335. def __init__(self, message):
  336. self.message = message
  337. def __str__(self):
  338. return repr(self.message)
  339. if __name__ == "__main__":
  340. parser = argparse.ArgumentParser(description="AN1310 Bootloader application for PIC Microchip controllers.")
  341. group = parser.add_mutually_exclusive_group(required=True)
  342. group.add_argument("--program", "-p", help="Program a file", action="store_true")
  343. group.add_argument("--verify", "-v", help="Verify a file", action="store_true")
  344. group.add_argument("--program-raw", "-r", help="Program a raw, compiled file", action="store_true")
  345. parser.add_argument("--device", "-d", nargs=1, help="Device (serial port)", required=True)
  346. parser.add_argument("--baudrate", "-b", nargs=1, type=int, help="baudrate")
  347. parser.add_argument("FILE", nargs=1, help="File input")
  348. options = parser.parse_args()
  349. if options.program:
  350. dev = AN1310Device(options.device[0])
  351. dev.enter_bootloader()
  352. dev.program_file(options.FILE[0])
  353. dev.run_application()
  354. elif options.verify:
  355. dev = AN1310Device(options.device[0])
  356. dev.enter_bootloader()
  357. dev.verify_file(options.FILE[0])
  358. dev.run_application()
  359. elif options.program_raw:
  360. dev = AN1310evice(options.device[0])
  361. dev.enter_bootloader()
  362. dev.program_raw_file(options.FILE[0])
  363. dev.run_application()