from typing import List, Tuple, Union, Optional class ProtocolFrame: # 协议常量 FRAME_HEADER = 0xAA FRAME_TAIL1 = 0x55 FRAME_TAIL2 = 0x55 RESERVED_SIZE = 6 MIN_FRAME_SIZE = 13 # 帧头1 + 功能1 + 长度2 + 预留6 + CRC1 + 包尾2 MAX_DATA_LENGTH = 0xFFFF # 最大数据长度 (2字节能表示的最大值) @staticmethod def calculate_crc8(data: bytes) -> bytes: """ 计算CRC8校验值 Args: data: 需要计算CRC的数据 Returns: 一个字节的CRC值(bytes类型) """ crc = 0 for byte in data: crc ^= byte for _ in range(8): crc = ((crc << 1) ^ 0x07 if crc & 0x80 else crc << 1) & 0xFF return bytes([crc]) @classmethod def pack(cls, function, data: Union[bytes, bytearray, List[int]], reserved: Optional[Union[bytes, bytearray, List[int]]] = None) -> bytes: """ 协议打包函数 Args: function: 功能码 (1字节) data: 数据块 reserved: 预留字节(6字节,可选) Returns: 打包后的字节数据 """ # 检查功能码 if function != None: if not 0 <= function <= 0xFF: raise ValueError("功能码必须是1字节") # 转换数据为bytearray if isinstance(data, list): data = bytearray(data) elif isinstance(data, bytes): data = bytearray(data) # 检查数据长度 data_length = len(data) if data_length > cls.MAX_DATA_LENGTH: raise ValueError(f"数据长度超过最大值 {cls.MAX_DATA_LENGTH}") # 处理预留字节 if reserved is None: reserved = bytearray([0] * cls.RESERVED_SIZE) else: if isinstance(reserved, list): reserved = bytearray(reserved) elif isinstance(reserved, bytes): reserved = bytearray(reserved) if len(reserved) != cls.RESERVED_SIZE: raise ValueError(f"预留字节必须是{cls.RESERVED_SIZE}字节") # 构建帧 frame = bytearray([cls.FRAME_HEADER]) # 帧头 (1字节) if function != None: frame.append(function) # 功能码 (1字节) data_length+=6 # 数据长度 (2字节,大端序) frame.append((data_length >> 8) & 0xFF) # 高字节 frame.append(data_length & 0xFF) # 低字节 if function != None: frame.extend(reserved) # 预留字节 (6字节) frame.extend(data) # 数据块 (变长) # 计算CRC (从功能码开始到数据块结束) crc = cls.calculate_crc8(frame[1:]) # 不包含帧头 frame.extend(crc) # CRC校验 (1字节) # 添加帧尾 frame.extend([cls.FRAME_TAIL1, cls.FRAME_TAIL2]) # 帧尾 (2字节) return bytes(frame) @classmethod def unpack(cls, data: Union[bytes, bytearray]) -> Tuple[int, bytearray, bytearray]: """ 协议解包函数 Args: data: 待解析的字节数据 Returns: (功能码, 数据块, 预留字节) Raises: ValueError: 当数据格式不正确时 """ # 检查数据长度 if len(data) < cls.MIN_FRAME_SIZE: raise ValueError("数据长度不足") # 检查帧头 if data[0] != cls.FRAME_HEADER: raise ValueError("帧头错误") # 检查帧尾 if data[-2:] != bytes([cls.FRAME_TAIL1, cls.FRAME_TAIL2]): raise ValueError("帧尾错误") # 解析基本信息 function = data[1] # 功能码 (1字节) # 数据长度 (2字节,大端序) data_length = (data[2] << 8) | data[3] reserved = data[4:10] # 预留字节 (6字节) # 检查数据长度 expected_length = cls.MIN_FRAME_SIZE + data_length if len(data) != expected_length: raise ValueError(f"数据长度不匹配: 期望{expected_length}字节,实际{len(data)}字节") # 提取数据块 payload = data[10:10 + data_length] # 验证CRC (从功能码开始到数据块结束) received_crc = data[-3] calculated_crc = cls.calculate_crc8(data[1:-3])[0] # 获取字节值 if received_crc != calculated_crc: raise ValueError(f"CRC校验失败: 期望{calculated_crc:02X},实际{received_crc:02X}") return function, bytearray(payload), bytearray(reserved) def print_hex(data: bytes, label: str = ""): """打印十六进制数据,并按字节添加空格""" hex_str = ' '.join([f"{b:02X}" for b in data]) if label: print(f"{label}: {hex_str}") else: print(hex_str) def print_frame_details(data: bytes): """打印帧的详细信息""" print("帧详细信息:") print(f"帧头: {data[0]:02X}") print(f"功能码: {data[1]:02X}") print(f"数据长度: {data[2]:02X} {data[3]:02X} ({(data[2] << 8) | data[3]}字节)") print(f"预留字节: {' '.join([f'{b:02X}' for b in data[4:10]])}") data_length = (data[2] << 8) | data[3] print(f"数据块: {' '.join([f'{b:02X}' for b in data[10:10 + data_length]])}") print(f"CRC校验: {data[-3]:02X}") print(f"帧尾: {data[-2]:02X} {data[-1]:02X}") # 使用示例 def example_usage(): try: # 示例1:简单数据打包 function_code = 0x01 data = [0x1] packed_data = ProtocolFrame.pack(function_code, data) print_hex(packed_data, "示例1 - 完整帧") print_frame_details(packed_data) print() # 示例3:解包验证 function, payload, reserved = ProtocolFrame.unpack(packed_data) print("解包结果:") print(f"功能码: 0x{function:02X}") print_hex(payload, "数据块") print_hex(reserved, "预留字节") except ValueError as e: print(f"错误: {e}") if __name__ == "__main__": example_usage()