python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python解析cyber record

Python解析cyber record文件的示例详解

作者:阳洞洞

cyber record 是 Cyber RT 中用于录制和回放数据的工具,本文将为大家展示一下如何使用Python进行解析cyber record文件,需要的可以参考下

Cyber RT 是百度开源的一个高性能、灵活的机器人操作系统,cyber record 是 Cyber RT 中用于录制和回放数据的工具。下面是一个使用 Python 解析 cyber record 文件的示例,该示例使用 cyber_py 库(Cyber RT 的 Python 绑定)来读取记录文件并打印消息信息。

一、环境准备

确保已经安装了 Cyber RT 开发环境,并且 cyber_py 库可以正常使用。

二、示例代码

以下是一个基于 cyber_py3 库的 Cyber Record 文件解析示例代码,支持读取目录或单个文件,并自动过滤非 Record 文件:

import argparse
import os
from cyber_py3 import record
from cyber_py3.record import RecordReader, RecordWriter, RecordMessage
 
def parse_record_file(file_path):
    """
    解析单个 Cyber Record 文件
    """
    try:
        reader = RecordReader(file_path)
        print(f"\n===== 正在解析文件: {os.path.basename(file_path)} =====")
        print(f"总消息数: {reader.get_messagenumber()}")
        print(f"开始时间: {reader.get_starttime()}")
        print(f"结束时间: {reader.get_endtime()}")
        print(f"通道列表: {reader.get_channellist()}\n")
 
        # 遍历所有消息
        for channel_name, msg, datatype, timestamp in reader.read_messages():
            print(f"[通道] {channel_name}")
            print(f"  时间戳: {timestamp}")
            print(f"  数据类型: {datatype}")
            print(f"  消息长度: {len(msg)} bytes")
            print("-" * 60)
 
    except Exception as e:
        print(f"解析文件 {file_path} 失败: {str(e)}")
 
def parse_record_directory(directory):
    """
    解析目录下的所有 Cyber Record 文件
    """
    if not os.path.isdir(directory):
        print(f"错误: {directory} 不是有效目录")
        return
 
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        
        # 通过文件头验证是否为合法 Record 文件
        if record.is_valid_record_file(file_path):
            parse_record_file(file_path)
        else:
            print(f"跳过非 Record 文件: {filename}")
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Cyber Record 文件解析工具')
    parser.add_argument('path', type=str, help='文件路径或目录路径')
    args = parser.parse_args()
 
    target_path = args.path
 
    if os.path.isfile(target_path):
        if record.is_valid_record_file(target_path):
            parse_record_file(target_path)
        else:
            print(f"错误: {target_path} 不是有效的 Cyber Record 文件")
    elif os.path.isdir(target_path):
        parse_record_directory(target_path)
    else:
        print(f"错误: {target_path} 不存在")

三、使用说明

运行示例:

# 解析单个文件
python parse_record.py /path/to/your.record
 
# 解析目录
python parse_record.py /path/to/record_dir/

功能特性:

自动验证文件有效性(通过文件头校验)

显示文件元信息:消息数量、时间范围、通道列表

支持解析消息头信息(通道、时间戳、数据类型)

自动跳过无效文件和非 Record 文件

四、关键实现说明

1.文件验证:

使用 record.is_valid_record_file() 方法进行二进制验证

比单纯检查文件扩展名更可靠

2.消息遍历:

reader.read_messages() 生成器逐条读取消息

返回元组:(channel_name, message, data_type, timestamp)

3.性能优化:

按需解析消息内容(当前示例仅读取元信息)

支持大文件流式读取(不加载全部内容到内存)

到此这篇关于Python解析cyber record文件的示例详解的文章就介绍到这了,更多相关Python解析cyber record内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文