software design

Application flow chart

Generally, when one wants to understand a program code, one usually starts from the initial part. Here, we also adopt this approach and first look for the starting point of the program. In this routine, the general program startup sequence is as shown in the following figure:

The overall business can be summarized as the following process:

  1. After the DTU object is initialized, the system configuration is read
  2. Example Initialize serial port peripherals
  3. Checking network status
  4. Initialize the cloud configuration and message queue
  5. Start the data uplink, downlink and downlink service threads

💡 Basic principle: This DTU uses multithreading + message queue to realize upstream and downstream forwarding of serial port and cloud data. The Uplink data thread reads data from the serial port and sends it to the cloud. The Downlink data thread reads cloud data in the message queue and forwards it through the serial port. Message queues are used to cache downlink data from the cloud.

directory structure

- code
    - common.py			common tools 
    - dtu.py			DTU application class implementation
    - dtu_config.json	Template configuration file
    - error.py			Error code and description
    - logging.py		log
    - main2.py			Application master script(demo)
    - mqttIot.py		mqtt Client implementation
    - serial.py			Serial read and write implementation
    - settings.py		Configuration file read and write implementation
    - socketIot.py		tcp client implementation

API Description

object model

In this scheme, multiple object models are defined, among which the main ones are DTU object model, cloud object model and serial port object model (Serial). The basic definitions are as follows:

DTU object model

The DTU class is defined in the dtu.pymodule, which is mainly used to initialize serial port, cloud, and upstream and downstream data services.

Support for storing relevant configurations in a JSON file is provided. The operation method is similar to that of the built-in dictionary type. The following way can be adopted to read and write the configurations (taking the reading of system_config configuration item as an example).

  • Import the configuration from the specified json file:Serial(**config.get('uart_config'))
  • Use the get method to read the configuration item:config.get('system_config.cloud')
from usr.dtu import DTU

    self.queue = Queue()  # DTU application interacts with MQTT and TCP client downlink data through the data exchange queue for the client side.
    self.serial = Serial(**config.get('uart_config'))

    cloud_type = config.get('system_config.cloud')

Implement the cloud communication function of MQTT protocol, providing connection, subscription, publication and monitoring functions.

self.cloud = MqttIot(
    mqtt_config['client_id'],
    mqtt_config['server'],
    port=mqtt_config['port'],
    user=mqtt_config['user'],
    password=mqtt_config['password'],
    keepalive=mqtt_config['keepalive'],
    clean_session=mqtt_config['clean_session'],
    qos=mqtt_config['qos'],
    subscribe_topic=mqtt_config['subscribe'],
    publish_topic=mqtt_config['publish'],
    queue=self.queue,
    error_trans=True
)

  • It implements the cloud communication function of TCP/UDP protocol, providing connection, sending, receiving and listening functions.
self.cloud = SocketIot(
    ip_type=socket_config['ip_type'],
    keep_alive=socket_config['keep_alive'],
    domain=socket_config['domain'],
    port=socket_config['port'],
    queue=self.queue,
    error_trans=True
)

up_transaction_handler method: This is the entry function for the uplink data transmission business thread.

down_transaction_handler method: It is the entry function for the downlink data transmission business thread.

    def down_transaction_handler(self):
        while True:
            topic, data = self.cloud.recv()
            logger.info('down transfer msg: {}'.format(data))
            self.serial.write(data)

     def up_transaction_handler(self):
    while True:
        data = self.serial.read(1024, timeout=100)
        if data:
            logger.info('up transfer msg: {}'.format(data))
            self.cloud.send(data)

run method: Initiates business operations, including creating serial ports and cloud objects based on configuration files, as well as creating business data processing threads for the upper and lower industries.

Serial object model

  • In the serial.py module, the serial port model class Serial is defined. This class is mainly used to implement the read and write operations of the serial port. The main interfaces include:Serial
    • __init__: Serial port initialization.
    • open:Open the serial port.
    • close:Shut down the serial port.
    • write:Serial port write.
    • read:Serial port read.

Example of serial port::

from usr.serial import Serial

# init Serial object
s = Serial(port=2, baudrate=115200, bytesize=8, parity=0, stopbits=1, flowctl=0, rs485_config=None)
# open serial
s.open()

# serial write method
s.write(b"hello world!")

# serial read method
recv_data = s.read()
print(recv_data)

Business code explanation

The data transmission service is mainly implemented in the DTU class (dtu.py), which is mainly used for managing serial ports, cloud services, and the uplink and downlink business of data.

In the main script, the DTU object initiates the entire DTU business by calling the run method. This method is mainly used to create and run two threads, namely the uplink data processing thread (whose thread work function is up_transaction_handler) and the downlink data processing thread (whose thread work function is down_transaction_handler). In the thread functions, the corresponding serial port object and cloud object are obtained through two property attributes respectively. The serial port object attribute is serial, and when the thread calls this attribute, it immediately creates and opens the configured serial port object to provide read and write interfaces. The cloud object attribute is cloud, and when the thread calls this attribute, it immediately creates and connects to the cloud object to provide receiving and sending interfaces.

Function call timing diagram

sequenceDiagram Title: Device data transmit-processing timing participant cloud as cloud participant dtu as DTU participant serial as Serial loop down_transaction_handler thread cloud ->> dtu: cloud.recv dtu ->> serial: Serial.write end loop up_transaction_handler thread serial ->> dtu: Serial.read dtu ->> cloud: cloud.send end

The upstream data processing thread function 'DTU.up_transaction_handler' is implemented as follows:

class DTU(object):

    # ...

    def up_transaction_handler(self):
        while True:
            data = self.serial.read(1024, timeout=100)
            if data:
                logger.info('up transfer msg: {}'.format(data))
                self.cloud.send(data)

    # ...

The up_transaction_handler function reads serial port data in 1KB buffer chunks (the buffer size can be adjusted by the user), formats the messages and sends the data to the cloud through the cloud.send interface.

The downlink data processing thread function 'down_transaction_handler' implements the following:

class DTU(object):

    # ...

    def down_transaction_handler(self):
        while True:
            topic, data = self.cloud.recv()
            logger.info('down transfer msg: {}'.format(data))
            self.serial.write(data)

    # ...

The down_transaction_handler function gets the downlink message by calling cloud.recvand forwards the message through Serial.write.