client

This module defines the SerialClient class, which is responsible for handling serial communication between the microcontroller and the python code running on the connected small board computer.

SerialClient(serial=usb_cdc.data, buf_out_size=1024,
read_timeout=0.01, write_timeout=0.015)
Parameters:
  • serial (serial.Serial) – The serial device instance to use, defaults to usb_cdc.data

  • buf_out_size (int) – Size of output buffer in bytes, defaults to 1024

  • read_timeout (float) – Read timeout period, specifying how long to read the serial line, before yielding to a write operation, defaults to 0.01s

  • write_timeout – Write timeout period, specifying how long to spend writing to the serial line, before yielding to a read operation, defaults to 0.015s

Variables:
  • serial – The serial device the object is using

  • buf_out_bytes_free – Number of free bytes available to write to in buf_out

  • buf_in – Input buffer for storing messages received over serial line

  • buf_out – Output buffer for storing messages to be sent over serial line

Methods:

SerialClient.write(data)

Perform an immediate write of data to the serial line, bypassing data in SerialClient.buf_out

Parameters:

data (bytes) – Byte-encoded message to be written to the serial line

Returns:

Status code indicating the number of bytes written

Return type:

int

SerialClient.update()

Perform a read/write operation on the serial line, reading any data sent from the computer into SerialClient.buf_in and writing data in SerialClient.buf_out out to the computer.

Code listing

"""
This module defines the :class:`SerialClient` class, which handles the serial
communication between the microcontroller and the computer.
"""
import usb_cdc
#import asyncio
from utils import ResponsiveDict

class SerialClient:
    """The :class:`SerialClient` class provides a simplified interface for 
    achieving state and data synchronisation between the microcontroller and
    connected computer, through its :meth:`update` method.
    
    :param serial: The serial device instance to use, defaults to usb_cdc.data
    :type serial: _type_, optional
    :param buf_out_size: Size of output buffer, in bytes, defaults to 1024
    :type buf_out_size: int, optional
    :param read_timeout: Read timeout period, specifying how long to read 
        the serial line, before yielding to a write operation, defaults to
        0.01s
    :type timeout: float, optional
    :param write_timeout: Write timeout period, specifying the maximum
        time to spend writing a message on the serial line, before yielding
        to a read operation, defaults to 0.015s
    :type write_timeout: float, optional
    """

    # Attributes

    serial = usb_cdc.data
    """The serial device the object is using
    """

    buf_out_bytes_free = 1024
    """Number of free bytes available to write to in 
    :attr:`SerialClient.buf_out`
    """

    buf_out = None
    """Output buffer for storing messages to be sent over the serial line
    """

    buf_in = None
    """Input buffer for storing messages received over serial line
    """


    def __init__(self, serial=serial, buf_out_size=buf_out_bytes_free,
                 read_timeout=0.01, write_timeout=0.015):
        """Create a new :class:`SerialClient` instance.
        """
        self.serial = serial
        self.serial.timeout = read_timeout
        self.serial.write_timeout = write_timeout
        
        self.buf_in = []
        self.buf_out = bytearray(buf_out_size)
        
        self.buf_out_bytes_free = buf_out_size

    def write(self, data):
        """Perform an immediate write of ``data`` to the serial line,
        bypassing data in :attr:`SerialClient.buf_out`.

        :param data: Byte-encoded message to be written to the serial line
        :type data: bytes
        :return: Status code indicating the number of bytes written
        :rtype: int
        """
        return self.serial.write(data)
        
    def update(self):
        """
        Perform a read/write operation on the serial line, reading any
        data sent from the computer into :attr:`SerialClient.buf_in` and
        writing data in :attr:`SerialClient.buf_out` out to the computer.
        """
        if self.serial.in_waiting > 0:
            msg = self.serial.readline()
            # Did we timeout whilst reading part of a message?
            if (len(msg) > 0 and msg.count(b'\n') < 1): 
                msg += self.serial.readline() # Finish reading
                self.buf_in.append(msg)
        mv = memoryview(self.buf_out)
        self.buf_out_bytes_free += self.write(mv[self.buf_out_bytes_free:])