Source code for nextion.nextion_upload

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""
NexUpload

Functions to download TFT file to Nextion display
"""

# system packages
from os import listdir, stat
from time import sleep, ticks_diff, ticks_ms

# custom packages
from .common import Common


[docs]class NexUploadError(Exception): """Base class for exceptions in this module.""" pass
[docs]class NexUpload(Common): """docstring for NexUpload""" def __init__(self, nh, file_name: str, download_baudrate: int) -> None: """ Init upload :param nh: The Nextion hardware interface object :type nh: NexHardware :param file_name: The update filename :type file_name: str :param download_baudrate: The download baudrate :type download_baudrate: int """ super().__init__(nh, pid=-1, cid=-1, name="upload") self._file_name = file_name self._file_size = 0 self._download_baudrate = download_baudrate if not self._checkFile(): raise NexUploadError("No such update file found: '{}'". format(file_name)) @property def file_name(self) -> str: """ Get the update filename :returns: Filename of update file :rtype: str """ return self._file_name @property def file_size(self) -> int: """ Get the update filesize :returns: Filesize of update file in bytes :rtype: int """ return self._file_size @file_size.setter def file_size(self, value: int) -> None: """ Filesize of update file :param value: The filesize :type value: int """ self._file_size = value @property def download_baudrate(self) -> int: """ Get the download baudrate :returns: Download baudrate for communication with Nextion display :rtype: int """ return self._download_baudrate @download_baudrate.setter def download_baudrate(self, value: int) -> None: """ Set download baudrate :param value: The baudrate value :type value: int """ self._download_baudrate = value
[docs] def upload(self) -> bool: """ Perform update of Nextion display content :returns: True on success, raise NexUploadError otherwise :rtype: bool """ if not self._checkFile(): raise NexUploadError("File not found") if self._getBaudrate() == 0: raise NexUploadError("Get baudrate error") if not self._setDownloadBaudrate(self.download_baudrate): raise NexUploadError("Modify baudrate error") if not self._downloadTftFile(): raise NexUploadError("Download file error") self._nh._logger.debug("Download ok") return True
[docs] def _checkFile(self) -> bool: """ Check existance of specified TFT file :returns: True on success, False otherwise :rtype: bool """ result = False if self._exists(self.file_name): # https://docs.python.org/3/library/os.html#os.stat info = stat(self.file_name) self.file_size = info[6] self._nh._logger.debug("TFT file size is '{}' bytes". format(self.file_size)) self._nh._logger.debug("File check ok") result = True else: self._nh._logger.debug("File '{}' does not exist". format(self.file_name)) return result
[docs] def _getBaudrate(self) -> int: """ Get communication baudrate with Nextion display :returns: The baudrate :rtype: int """ baudrate_array = [115200, 19200, 9600, 57600, 38400, 4800, 2400] _baudrate = 0 for baudrate in baudrate_array: self._nh._logger.debug("Checking connection with '{}' baud". format(baudrate)) if self._searchBaudrate(baudrate): _baudrate = baudrate self._nh._logger.debug("Success, baudrate set to '{}' baud". format(_baudrate)) return _baudrate return _baudrate
[docs] def _searchBaudrate(self, baudrate: int) -> bool: """ Find suitable download baudrate :param baudrate: The baudrate :type baudrate: int :returns: True on success, False otherwise :rtype: bool """ self._nh._baudrate = baudrate self._nh._uart_init() self._nh.sendCommand("") self._nh.sendCommand("connect") sleep(0.1) # necessary, data might not be available otherwise response = self._recvRetString() self._nh._logger.debug("_searchBaudrate response for '{}' baud: {}". format(baudrate, response)) if "comok" in response: return True return False
[docs] def _setDownloadBaudrate(self, baudrate: int) -> bool: """ Set the download baudrate :param baudrate: The baudrate :type baudrate: int :returns: True on success, False otherwise :rtype: bool """ cmd = "whmi-wri {},{},0".format(self.file_size, baudrate) self._nh._logger.debug("Set download baudrate cmd: '{}'".format(cmd)) self._nh.sendCommand("") self._nh.sendCommand(cmd) sleep(0.05) self._nh._baudrate = baudrate self._nh._uart_init() response = self._recvRetString(500) self._nh._logger.debug( "Set download baudrate response: '{}'".format(response)) if (0x05).to_bytes(1, 'little') in response: return True return False
[docs] def _downloadTftFile(self) -> bool: """ Download TFT file to Nextion display :returns: True on success, False otherwise :rtype: bool """ # send chunks of 4096 file_content = bytearray(4096) with open(self.file_name, 'rb') as update_file: while True: data_size = update_file.readinto(file_content) if not data_size: self._nh._logger.debug("Reached EOF, update finished") break self._nh._uart.write(file_content) response = self._recvRetString(500) # self._nh._logger.debug("File download response: '{}'". # format(response)) if (0x05).to_bytes(1, 'little') in response: # send next chunk pass else: return False return True
[docs] def _recvRetString(self, timeout: int = 100) -> bytearray: """ Receive a string :param timeout: The communication timeout :type timeout: int :returns: Received response :rtype: bytearray """ buf = bytearray(70) start_ms = ticks_ms() # stay inside this while loop at least for the timeout time while (ticks_diff(ticks_ms(), start_ms) <= timeout): # check amount of available characters while self._nh._uart.any(): # can not decode received data here, as it might not be UTF-8 self._nh._uart.readinto(buf) return buf
[docs] def _exists(self, path: str) -> bool: """ Check existance of file at given path. :param path: The path to the file :type path: str :returns: Existance of file :rtype: bool """ result = False path_to_file_list = path.split('/') # ['path', 'to', 'some', 'file.txt'] root_path = '' this_path = root_path for ele in path_to_file_list[:-1]: files_in_dir = listdir(this_path) # print('Files in {}: {}'.format(this_path, files_in_dir)) if ele in files_in_dir: # print('"{}" found in "{}"'.format(ele, files_in_dir)) if this_path == '': this_path += '{}'.format(ele) else: this_path += '/{}'.format(ele) # print('Next folder to be checked is: {}'.format(this_path)) else: return result files_in_dir = listdir(this_path) if path_to_file_list[-1] in files_in_dir: # print('File "{}" found in "{}"'. # format(path_to_file_list[-1], this_path)) return True else: return False