Source code for nextion.nextion_hardware

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""
Common

This class performs the basic and common interactions with the Nextion display

Known issues
    - sendCommand uses the default timeout, but should use timeout = 0
      https://github.com/micropython/micropython/issues/3434
"""

# system packages
from machine import UART
from time import ticks_diff, ticks_ms

# custom packages
from .typing import Optional
from . import const as Const
from . import ulogging as logging


[docs]class NexHardwareError(Exception): """Base class for exceptions in this module.""" pass
[docs]class NexHardware(object): """docstring for NexHardware""" def __init__(self, rx_pin: int, tx_pin: int, uart_id: int = 1, baudrate: Optional[int] = 9600, timeout: Optional[int] = 100, invert: Optional[int] = 0, logger: Optional[logging.Logger] = None) -> None: """ Init hardware interface :param rx_pin: The UART RX :type rx_pin: int :param tx_pin: The UART TX :type tx_pin: int :param baudrate: The communication baudrate :type baudrate: int :param timeout: The communication timeout :type timeout: int :param invert: Specify inverted communication lines :type invert: see MicroPython UART docs :param logger: The logger :type logger: logging.Logger """ if logger is None: logger = NexHardware.create_logger( logger_name=self.__class__.__name__) logger.setLevel(level=logging.WARNING) self._logger = logger self._logger.disabled = False self._tx_pin = tx_pin self._rx_pin = rx_pin self._baudrate = baudrate self._timeout = timeout self._invert = invert self._uart = UART( uart_id, tx=self._tx_pin, rx=self._rx_pin, baudrate=self._baudrate, timeout=self._timeout, invert=self._invert) self._initialized = False
[docs] @staticmethod def create_logger(logger_name: Optional[str] = None) -> logging.Logger: """ Create a logger. :param logger_name: The logger name :type logger_name: str, optional :returns: Configured logger :rtype: logging.Logger """ logging.basicConfig(level=logging.INFO) if logger_name and (isinstance(logger_name, str)): logger = logging.getLogger(logger_name) else: logger = logging.getLogger(__name__) # set the logger level to DEBUG if specified differently logger.setLevel(logging.DEBUG) return logger
[docs] def _uart_init(self) -> None: """Init UART instance""" if self._uart: self._uart.init( tx=self._tx_pin, rx=self._rx_pin, baudrate=self._baudrate, timeout=self._timeout, invert=self._invert)
[docs] def nexInit(self) -> bool: """ Init Nextion interface to display :returns: Result of init, True on success, False otherwise :rtype: bool """ ret1 = False ret2 = False self._uart_init() self.sendCommand("") self.sendCommand("bkcmd=1") ret1 = self.recvRetCommandFinished() self.sendCommand("page 0") ret2 = self.recvRetCommandFinished() self._initialized = True return ret1 & ret2
[docs] def sleep(self, state: bool) -> None: """ Control display sleep state :param state: Flag, True is sleep mode, False is wakeup :type state: bool """ cmd = "sleep={}".format(int(state)) self.sendCommand(cmd)
[docs] def brightness(self, value: int) -> None: """ Set brightness of display :param value: The value in percent [0, 100] :type value: int """ cmd = "dim={}".format(int(value)) self.sendCommand(cmd)
[docs] def reset(self) -> None: """Reset display like on a power cycle""" cmd = "rest" self.sendCommand(cmd)
def nexLoop(self): pass
[docs] def recvRetNumber(self, timeout: int = 100) -> int: """ Receive a uint32_t number :param timeout: The communication timeout :type timeout: int :returns: Received number :rtype: int """ if timeout != self._timeout: _old_timeout = self._timeout self._timeout = timeout self._uart_init() self._timeout = _old_timeout # ret = False temp = bytearray(8) number = 0 # C implementation requires a pointer to a number as argument # Python can directly return the string :) # if not number: # self._logger.debug("recvRetNumber err") # return ret # function not available in MicroPython, see # https://github.com/micropython/micropython/issues/3434 # self._uart.timeout = timeout if self._uart.any() != len(temp): self._logger.debug("recvRetNumber err") # return ret return number # at least len(buf) bytes are read if not stopped earlier by a timeout self._uart.readinto(temp) self._logger.debug(''.join('0x{:02x} '.format(i) for i in temp)) if ((temp[0] == Const.NEX_RET_NUMBER_HEAD) and (temp[5] == 0xFF) and (temp[6] == 0xFF) and (temp[7] == 0xFF)): number = (temp[4] << 24) | (temp[3] << 16) | (temp[2] << 8) | (temp[1]) # noqa # ret = True # if ret: # self._logger.debug("recvRetNumber: {}".format(number)) self._logger.debug("recvRetNumber: {}".format(number)) # return ret return number
[docs] def recvRetString(self, timeout: int = 100) -> str: """ Receive a string :param timeout: The communication timeout :type timeout: int :returns: Received string :rtype: int """ if timeout != self._timeout: _old_timeout = self._timeout self._timeout = timeout self._uart_init() self._timeout = _old_timeout # ret = 0 str_start_flag = False cnt_0xff = 0 temp = "" c = 0 # C implementation requires a pointer to a buffer as argument # Python can directly return the string :) # if ((buf is None) or (length == 0)): # self._logger.debug("recvRetString[{}, {}]".format( # len(temp), temp)) # return ret start_ms = ticks_ms() # stay inside this while loop at least for the timeout time while (ticks_diff(ticks_ms(), start_ms) <= timeout): # check amount of available characters while self._uart.any(): # can not decode received data here, as it might not be UTF-8 # use "self._uart.readinto(buf, 1)" to avoid the call of # ".to_bytes(1, 'little')" later c = self._uart.read(1) if str_start_flag: if c == (0xFF).to_bytes(1, 'little'): cnt_0xff += 1 if cnt_0xff >= 3: # 3x 0xFF marks the end of the message, exit break else: temp += c.decode('utf-8') elif c == (Const.NEX_RET_STRING_HEAD).to_bytes(1, 'little'): str_start_flag = True if cnt_0xff >= 3: break # ret = len(temp) # # ret = ret > len ? len : ret; # not required at all, this is Python, not C # if ret > length: # ret = length # else: # ret = ret # # buf = temp self._logger.debug("recvRetString[{}, {}]".format(len(temp), temp)) # return ret return temp
[docs] def sendCommand(self, cmd: str) -> None: """ Send a command to the Nextion serial display. :param cmd: The command :type cmd: str """ while self._uart.any(): self._uart.read() # function not available in MicroPython, see # https://github.com/micropython/micropython/issues/3434 # self._uart.timeout = 0 self._uart.write(cmd) self._uart.write(Const.NEX_END_CMD)
[docs] def recvRetCommandFinished(self, timeout: int = 100) -> bool: """ Command is executed successfully :param timeout: The communication timeout :type timeout: int :returns: True on success, False otherwise :rtype: bool """ if timeout != self._timeout: _old_timeout = self._timeout self._timeout = timeout self._uart_init() self._timeout = _old_timeout ret = False temp = bytearray(4) # function not available in MicroPython, see # https://github.com/micropython/micropython/issues/3434 # self._uart.timeout = timeout # this check has no effect, as ret is False by default if self._uart.any() != len(temp): self._logger.debug("recvRetCommandFinished err") ret = False # at least len(buf) bytes are read if not stopped earlier by a timeout self._uart.readinto(temp) self._logger.debug(''.join('0x{:02x} '.format(i) for i in temp)) if ((temp[0] == Const.NEX_RET_CMD_FINISHED) and (temp[1] == 0xFF) and (temp[2] == 0xFF) and (temp[3] == 0xFF)): self._logger.debug("recvRetCommandFinished ok") ret = True return ret