python использует MQTT для передачи изображений на оборудование

Python

Краткое описание миссии

Недавно мне нужно было написать микросервис на питоне для передачи картинок на железо по MQTT.На питоне используется фреймворк flask.Общий процесс такой:

image

Соглашение:
  • Данные изображения необходимо инкапсулировать в несколько сообщений для передачи, а количество байтов данных, передаваемых каждым сообщением, составляет 1400 байт.
  • Сообщение (полезная нагрузка MQTT) Формат: Веб-сервер -------> БАЗА:
    image
  • Обратная связь: БАЗА---------> Веб-сервер:
    image
  • Если веб-сервер не получает «сообщение обратной связи» MQTT в течение 5 секунд после отправки «сообщения о передаче данных» или полученный ответ показывает «неполный пакет данных», «сообщение о передаче данных» будет отправлено повторно.

Блок-схема программы

В соответствии с приведенным выше протоколом можно получить следующую блок-схему:

image

код показывает, как показано ниже:

# encoding:utf-8
from flask import Flask, jsonify
from flask_restful import Api, Resource, reqparse
from PIL import Image
from io import BytesIO
import requests
import os, logging, time
import paho.mqtt.client as mqtt
import struct
from flask_cors import *

# 日志配置信息
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s  (runing by %(funcName)s',
)


class Mqtt(object):
    def __init__(self, img_data, size):
        self.MQTTHOST = '*******'
        self.MQTTPORT = "******"

        # 订阅和发送的主题
        self.topic_from_base = 'mqttTestSub'
        self.topic_to_base = 'mqttTestPub'

        self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
        self.client = mqtt.Client(self.client_id)

        # 完成链接后的回掉函数
        self.client.on_connect = self.on_connect
        # 图片大小
        self.size = size

        # 用于跳出死循环,结束任务
        self.finished = None

        # 包的编号
        self.index = 0

        # 将收到的图片数据按大小分成列表
        self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]

        # 记录发布后的数据,用于监控时延
        self.pub_time = 0


        self.header_to_base = 0xffffeeee
        self.header_from_base = 0xeeeeffff

        # 功能标识
        self.function_begin = 0x01
        self.function_doing = 0x02
        self.function_finished = 0x03

        # 包的完整和非完整状态
        self.whole_package = 0x01
        self.bad_package = 0x00

        # 头信息的格式,小端模式
        self.format_to_base = "<Lbhh"
        self.format_from_base = "<Lbhb"

        # 如果重发包时,用于检查是否重发第一个包
        self.first = True

        # 如果重发包时,用于检查是否重发最后一个包
        self.last = False

        self.begin_data = 'image.jpg;' + str(self.size)

    # 链接mqtt服务器函数
    def on_mqtt_connect(self):
        self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
        self.client.loop_start()

    # 链接完成后的回调函数
    def on_connect(self, client, userdata, flags, rc):
        logging.info("+++ Connected with result code {} +++".format(str(rc)))
        self.client.subscribe(self.topic_from_base)

    # 订阅函数
    def subscribe(self):
        self.client.subscribe(self.topic_from_base, 1)
        # 消息到来处理函数
        self.client.on_message = self.on_message

    # 接收到信息后的回调函数

    def on_message(self, client, userdata, msg):
        # 如果接受第一个包则不需要重发第一个
        self.first = False

        # 将接受到的包进行解压,得到一个元组
        base_tuple = struct.unpack(self.format_from_base, msg.payload)
        logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
        logging.info("+++ package_number is {}, package_status_from_base is {}  +++"
                     .format(base_tuple[2], base_tuple[3]))

        # 检查接受到信息的头部是否正确
        if base_tuple[0] == self.header_from_base:
            logging.info("+++ function_from_base is {}  +++".format(base_tuple[1]))

            # 是否完成传输,如果完成则退出
            if base_tuple[1] == self.function_finished:
                logging.info("+++  finish work +++")
                self.finished = 1
                self.client.disconnect()
            else:
                # 是否是最后一个包
                if self.index == len(self.image_data_list) - 1:
                    self.publish('finished', self.function_finished)
                    self.last = True
                    logging.info("+++ finished_data_to_base is finished+++")
                else:

                    # 如果接收到的包不是 0x03则进行传送数据
                    if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
                        logging.info("+++ package_number is {}, package_status_from_base is {}  +++"
                                     .format(base_tuple[2],base_tuple[3]))

                        # 如果数据的反馈中,包的状态是1则继续发下一个包
                        if base_tuple[3] == self.whole_package:
                            self.publish(self.index, self.function_doing)
                            logging.info("+++ data_to_base is finished+++")
                            self.index += 1

                        # 如果数据的反馈中,包的状态是0则重发数据包
                        elif base_tuple[3] == self.bad_package:
                            re_package_number = base_tuple[2]
                            self.publish(re_package_number-1, self.function_doing)
                            logging.info("+++ re_data_to_base is finished+++")
                        else:
                            logging.info("+++ package_status_from_base is not 0 or 1 +++")
                            self.client.disconnect()
                    else:
                        logging.info("+++  function_identifier is illegal +++")
                        self.client.disconnect()
        else:
            logging.info("+++ header_from_base is illegal +++")
            self.client.disconnect()

    # 数据发送函数
    def publish(self, index, fuc):
        # 看是否是最后一个包
        if index == 'finished':
            length = 0
            package_number = 0
            data = b''
        else:
            length = len(self.image_data_list[index])
            package_number = index
            data = self.image_data_list[index]

        # 打包数据头信息
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            fuc,
            package_number,
            length
        )
        to_base_data = buffer + data

        # mqtt发送
        self.client.publish(
            self.topic_to_base,
            to_base_data
        )
        self.pub_time = time.time()

    # 发送第一个包函数
    def publish_begin(self):
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            self.function_begin,
            0,
            len(self.begin_data.encode('utf-8')),
        )
        begin_data = buffer + self.begin_data.encode('utf-8')
        self.client.publish(self.topic_to_base, begin_data)

    # 控制函数
    def control(self):
        self.on_mqtt_connect()
        self.publish_begin()
        begin_time = time.time()
        self.pub_time = time.time()
        self.subscribe()
        while True:
            time.sleep(1)
            # 超过5秒重传
            date = time.time() - self.pub_time
            if date > 5:
                # 是否重传第一个包
                if self.first == True:
                    self.publish_begin()
                    logging.info('+++ this is timeout first_data +++')

                # 是否重传最后一个包
                elif self.last == True:
                    self.publish('finished', self.function_finished)
                    logging.info('+++ this is timeout last_data +++')
                else:
                    self.publish(self.index-1, self.function_doing)
                    logging.info('+++ this is timeout middle_data +++')
            if self.finished == 1:
                logging.info('+++ all works is finished+++')
                break

        print(str(time.time()-begin_time) + 'begin_time - end_time')

app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)

# 接受参数
parser = reqparse.RequestParser()
parser.add_argument('url', help='mqttImage url', location='args', type=str)


class GetImage(Resource):
    # 得到参数并从图床下载到本地
    def get(self):
        args = parser.parse_args()
        url = args.get('url')
        response = requests.get(url)
        # 获取图片
        image = Image.open(BytesIO(response.content))
        # 存取图片
        add = os.path.join(os.path.abspath(''), 'image.jpg')
        image.save(add)
        # 得到图片大小
        size = os.path.getsize(add)
        f = open(add, 'rb')
        imageData = f.read()
        f.close()

        # 进行mqtt传输
        mqtt = Mqtt(imageData, size)
        mqtt.control()

        # 删除文件
        os.remove(add)
        logging.info('*** the result of control is {} ***'.format(1))
        return jsonify({
            "imageData": 1
        })


api.add_resource(GetImage, '/image')

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')