python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python JSON数据处理

Python数据处理之JSON数据转换和处理详解

作者:零日失眠者

这篇文章主要为大家详细介绍了如何使用Python实现一个专业的JSON数据处理和转换工具,专门用于处理验证和转换JSON格式的数据,有需要的可以了解下

功能介绍

这是一个专业的JSON数据处理和转换工具,专门用于处理、验证和转换JSON格式的数据。该工具具备以下核心功能:

数据解析功能

数据验证功能

数据转换功能

数据清洗功能

批量处理功能

场景应用

1. API数据处理

2. 配置文件管理

3. 日志数据分析

4. 数据集成项目

报错处理

1. JSON解析异常

try:
    data = json.loads(json_string)
except json.JSONDecodeError as e:
    logger.error(f"JSON解析错误: {str(e)}")
    raise JSONProcessingError(f"JSON格式错误: {str(e)}")
except UnicodeDecodeError as e:
    logger.error(f"编码错误: {str(e)}")
    raise JSONProcessingError(f"文件编码错误: {str(e)}")

2. 数据验证异常

try:
    jsonschema.validate(instance=data, schema=schema)
except jsonschema.ValidationError as e:
    logger.error(f"数据验证失败: {str(e)}")
    raise JSONProcessingError(f"数据不符合Schema要求: {str(e)}")
except jsonschema.SchemaError as e:
    logger.error(f"Schema定义错误: {str(e)}")
    raise JSONProcessingError(f"Schema格式错误: {str(e)}")

3. 文件操作异常

try:
    with open(input_file, 'r', encoding=encoding) as f:
        data = json.load(f)
except FileNotFoundError:
    logger.error(f"输入文件不存在: {input_file}")
    raise JSONProcessingError(f"文件未找到: {input_file}")
except PermissionError:
    logger.error(f"无权限访问文件: {input_file}")
    raise JSONProcessingError(f"文件访问权限不足: {input_file}")

4. 内存溢出异常

try:
    # 处理大文件时的内存管理
    if file_size > MAX_FILE_SIZE:
        process_large_json_file(input_file)
except MemoryError:
    logger.error("内存不足,无法处理大文件")
    raise JSONProcessingError("内存不足,请减小文件大小或增加系统内存")

代码实现

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
JSON数据处理和转换工具
功能:处理、验证和转换JSON格式数据
作者:Cline
版本:1.0
"""

import json
import argparse
import sys
import logging
import os
from datetime import datetime
import jsonschema
from typing import Dict, List, Any, Optional, Union
import pandas as pd
import xml.etree.ElementTree as ET
from collections import OrderedDict
import chardet

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('json_processor.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

class JSONProcessingError(Exception):
    """JSON处理异常类"""
    pass

class JSONProcessor:
    def __init__(self, config: Dict[str, Any]):
        self.input_file = config.get('input_file')
        self.output_file = config.get('output_file', 'processed_data.json')
        self.encoding = config.get('encoding', 'auto')
        self.schema_file = config.get('schema_file')
        self.output_format = config.get('output_format', 'json')
        self.flatten_depth = config.get('flatten_depth', 0)
        self.backup_original = config.get('backup_original', True)
        
        # 处理选项
        self.options = config.get('options', {})
        
        # 处理结果统计
        self.stats = {
            'total_records': 0,
            'processed_records': 0,
            'errors': [],
            'warnings': []
        }
        
    def detect_encoding(self, file_path: str) -> str:
        """自动检测文件编码"""
        try:
            with open(file_path, 'rb') as f:
                raw_data = f.read(10000)  # 读取前10KB数据
                result = chardet.detect(raw_data)
                encoding = result['encoding']
                confidence = result['confidence']
                
                logger.info(f"检测到文件编码: {encoding} (置信度: {confidence:.2f})")
                return encoding if confidence > 0.7 else 'utf-8'
                
        except Exception as e:
            logger.warning(f"编码检测失败,使用默认编码: {str(e)}")
            return 'utf-8'
            
    def load_json_data(self) -> Union[Dict, List]:
        """加载JSON数据"""
        logger.info(f"开始加载JSON数据文件: {self.input_file}")
        
        try:
            # 自动检测编码
            if self.encoding == 'auto':
                self.encoding = self.detect_encoding(self.input_file)
                
            # 检测文件大小
            file_size = os.path.getsize(self.input_file)
            if file_size > 100 * 1024 * 1024:  # 100MB
                logger.warning(f"文件较大 ({file_size / (1024*1024):.2f} MB),建议使用流式处理")
                
            # 尝试加载JSON数据
            with open(self.input_file, 'r', encoding=self.encoding) as f:
                # 检测是否为JSON Lines格式
                first_line = f.readline().strip()
                f.seek(0)  # 重置文件指针
                
                if first_line.startswith('[') or first_line.startswith('{'):
                    # 标准JSON格式
                    data = json.load(f)
                else:
                    # JSON Lines格式
                    data = []
                    for line_num, line in enumerate(f, 1):
                        if line.strip():
                            try:
                                data.append(json.loads(line))
                            except json.JSONDecodeError as e:
                                logger.warning(f"第 {line_num} 行JSON解析失败: {str(e)}")
                                
            self.stats['total_records'] = len(data) if isinstance(data, list) else 1
            logger.info(f"成功加载JSON数据,共 {self.stats['total_records']} 条记录")
            return data
            
        except FileNotFoundError:
            logger.error(f"输入文件不存在: {self.input_file}")
            raise JSONProcessingError(f"文件未找到: {self.input_file}")
        except json.JSONDecodeError as e:
            logger.error(f"JSON解析错误: {str(e)}")
            raise JSONProcessingError(f"JSON格式错误: {str(e)}")
        except UnicodeDecodeError as e:
            logger.error(f"文件编码错误: {str(e)}")
            raise JSONProcessingError(f"编码错误,请尝试指定其他编码格式")
        except Exception as e:
            logger.error(f"加载JSON数据时发生错误: {str(e)}")
            raise JSONProcessingError(f"数据加载失败: {str(e)}")
            
    def backup_original(self):
        """备份原始文件"""
        if not self.backup_original:
            return
            
        try:
            backup_name = f"{self.input_file}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            import shutil
            shutil.copy2(self.input_file, backup_name)
            logger.info(f"原始文件已备份到: {backup_name}")
        except Exception as e:
            logger.warning(f"备份原始文件失败: {str(e)}")
            
    def validate_schema(self, data: Union[Dict, List]):
        """验证JSON Schema"""
        if not self.schema_file or not os.path.exists(self.schema_file):
            logger.info("未指定Schema文件,跳过Schema验证")
            return True
            
        try:
            with open(self.schema_file, 'r', encoding='utf-8') as f:
                schema = json.load(f)
                
            # 验证数据
            jsonschema.validate(instance=data, schema=schema)
            logger.info("JSON数据通过Schema验证")
            return True
            
        except jsonschema.ValidationError as e:
            logger.error(f"数据验证失败: {str(e)}")
            self.stats['errors'].append({
                'type': 'schema_validation',
                'message': str(e)
            })
            return False
        except jsonschema.SchemaError as e:
            logger.error(f"Schema定义错误: {str(e)}")
            raise JSONProcessingError(f"Schema格式错误: {str(e)}")
        except Exception as e:
            logger.error(f"Schema验证时发生错误: {str(e)}")
            raise JSONProcessingError(f"Schema验证失败: {str(e)}")
            
    def flatten_dict(self, d: Dict, parent_key: str = '', sep: str = '.') -> Dict:
        """扁平化嵌套字典"""
        items = []
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, dict) and (self.flatten_depth <= 0 or parent_key.count(sep) < self.flatten_depth):
                items.extend(self.flatten_dict(v, new_key, sep=sep).items())
            elif isinstance(v, list) and v and isinstance(v[0], dict):
                # 处理字典列表
                for i, item in enumerate(v):
                    if isinstance(item, dict):
                        items.extend(self.flatten_dict(item, f"{new_key}[{i}]", sep=sep).items())
                    else:
                        items.append((f"{new_key}[{i}]", item))
            else:
                items.append((new_key, v))
        return dict(items)
        
    def process_record(self, record: Dict) -> Dict:
        """处理单条记录"""
        try:
            processed_record = record.copy()
            
            # 字段重命名
            field_mapping = self.options.get('field_mapping', {})
            for old_name, new_name in field_mapping.items():
                if old_name in processed_record:
                    processed_record[new_name] = processed_record.pop(old_name)
                    
            # 数据类型转换
            type_conversions = self.options.get('type_conversions', {})
            for field, target_type in type_conversions.items():
                if field in processed_record:
                    try:
                        if target_type == 'int':
                            processed_record[field] = int(processed_record[field])
                        elif target_type == 'float':
                            processed_record[field] = float(processed_record[field])
                        elif target_type == 'str':
                            processed_record[field] = str(processed_record[field])
                        elif target_type == 'bool':
                            processed_record[field] = bool(processed_record[field])
                    except (ValueError, TypeError) as e:
                        logger.warning(f"字段 {field} 类型转换失败: {str(e)}")
                        self.stats['warnings'].append({
                            'type': 'type_conversion',
                            'field': field,
                            'message': str(e)
                        })
                        
            # 数据清洗
            cleaning_options = self.options.get('cleaning', {})
            if cleaning_options.get('remove_empty', False):
                processed_record = {k: v for k, v in processed_record.items() if v is not None and v != ''}
                
            # 扁平化处理
            if self.flatten_depth > 0:
                processed_record = self.flatten_dict(processed_record)
                
            return processed_record
            
        except Exception as e:
            logger.error(f"处理记录时发生错误: {str(e)}")
            self.stats['errors'].append({
                'type': 'record_processing',
                'message': str(e)
            })
            return record  # 返回原始记录
            
    def process_data(self, data: Union[Dict, List]) -> Union[Dict, List]:
        """处理JSON数据"""
        logger.info("开始处理JSON数据...")
        
        try:
            if isinstance(data, list):
                # 处理记录列表
                processed_data = []
                for i, record in enumerate(data):
                    if isinstance(record, dict):
                        processed_record = self.process_record(record)
                        processed_data.append(processed_record)
                        self.stats['processed_records'] += 1
                    else:
                        processed_data.append(record)
                        self.stats['processed_records'] += 1
                        
                    # 显示进度
                    if (i + 1) % 1000 == 0:
                        logger.info(f"已处理 {i + 1} 条记录")
                        
            elif isinstance(data, dict):
                # 处理单个字典
                processed_data = self.process_record(data)
                self.stats['processed_records'] = 1
                
            else:
                # 其他类型直接返回
                processed_data = data
                self.stats['processed_records'] = 1
                
            logger.info(f"数据处理完成,共处理 {self.stats['processed_records']} 条记录")
            return processed_data
            
        except Exception as e:
            logger.error(f"处理JSON数据时发生错误: {str(e)}")
            raise JSONProcessingError(f"数据处理失败: {str(e)}")
            
    def convert_to_csv(self, data: Union[Dict, List], output_file: str):
        """转换为CSV格式"""
        try:
            if isinstance(data, list) and all(isinstance(item, dict) for item in data):
                # 转换记录列表为DataFrame
                df = pd.DataFrame(data)
                df.to_csv(output_file, index=False, encoding='utf-8-sig')
            elif isinstance(data, dict):
                # 转换单个字典为DataFrame
                df = pd.DataFrame([data])
                df.to_csv(output_file, index=False, encoding='utf-8-sig')
            else:
                raise JSONProcessingError("数据格式不支持转换为CSV")
                
            logger.info(f"数据已转换为CSV格式并保存到: {output_file}")
            
        except Exception as e:
            logger.error(f"转换为CSV时发生错误: {str(e)}")
            raise JSONProcessingError(f"CSV转换失败: {str(e)}")
            
    def convert_to_excel(self, data: Union[Dict, List], output_file: str):
        """转换为Excel格式"""
        try:
            if isinstance(data, list) and all(isinstance(item, dict) for item in data):
                # 转换记录列表为DataFrame
                df = pd.DataFrame(data)
                df.to_excel(output_file, index=False)
            elif isinstance(data, dict):
                # 转换单个字典为DataFrame
                df = pd.DataFrame([data])
                df.to_excel(output_file, index=False)
            else:
                raise JSONProcessingError("数据格式不支持转换为Excel")
                
            logger.info(f"数据已转换为Excel格式并保存到: {output_file}")
            
        except Exception as e:
            logger.error(f"转换为Excel时发生错误: {str(e)}")
            raise JSONProcessingError(f"Excel转换失败: {str(e)}")
            
    def convert_to_xml(self, data: Union[Dict, List], output_file: str):
        """转换为XML格式"""
        try:
            def dict_to_xml(d, root):
                for k, v in d.items():
                    if isinstance(v, dict):
                        sub_root = ET.SubElement(root, k)
                        dict_to_xml(v, sub_root)
                    elif isinstance(v, list):
                        for item in v:
                            if isinstance(item, dict):
                                sub_root = ET.SubElement(root, k)
                                dict_to_xml(item, sub_root)
                            else:
                                sub_element = ET.SubElement(root, k)
                                sub_element.text = str(item)
                    else:
                        sub_element = ET.SubElement(root, k)
                        sub_element.text = str(v)
                        
            # 创建根元素
            root = ET.Element("root")
            
            if isinstance(data, list):
                for item in data:
                    if isinstance(item, dict):
                        item_root = ET.SubElement(root, "item")
                        dict_to_xml(item, item_root)
            elif isinstance(data, dict):
                dict_to_xml(data, root)
                
            # 保存XML文件
            tree = ET.ElementTree(root)
            tree.write(output_file, encoding='utf-8', xml_declaration=True)
            logger.info(f"数据已转换为XML格式并保存到: {output_file}")
            
        except Exception as e:
            logger.error(f"转换为XML时发生错误: {str(e)}")
            raise JSONProcessingError(f"XML转换失败: {str(e)}")
            
    def save_results(self, data: Union[Dict, List]):
        """保存处理结果"""
        try:
            # 确保输出目录存在
            output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.'
            os.makedirs(output_dir, exist_ok=True)
            
            if self.output_format == 'json':
                with open(self.output_file, 'w', encoding='utf-8') as f:
                    json.dump(data, f, indent=2, ensure_ascii=False)
            elif self.output_format == 'csv':
                self.convert_to_csv(data, self.output_file)
            elif self.output_format == 'excel':
                self.convert_to_excel(data, self.output_file)
            elif self.output_format == 'xml':
                self.convert_to_xml(data, self.output_file)
            else:
                logger.error(f"不支持的输出格式: {self.output_format}")
                raise JSONProcessingError(f"不支持的输出格式: {self.output_format}")
                
            logger.info(f"处理结果已保存到 {self.output_file}")
            
        except Exception as e:
            logger.error(f"保存处理结果时出错: {str(e)}")
            raise JSONProcessingError(f"保存失败: {str(e)}")
            
    def generate_report(self):
        """生成处理报告"""
        try:
            report = {
                'timestamp': datetime.now().isoformat(),
                'input_file': self.input_file,
                'output_file': self.output_file,
                'processing_stats': self.stats,
                'options': self.options
            }
            
            report_file = f"{self.output_file}.report.json"
            with open(report_file, 'w', encoding='utf-8') as f:
                json.dump(report, f, indent=2, ensure_ascii=False)
                
            logger.info(f"处理报告已保存到 {report_file}")
            
            # 打印简要报告
            print("\n" + "="*50)
            print("JSON数据处理报告")
            print("="*50)
            print(f"处理时间: {report['timestamp']}")
            print(f"输入文件: {self.input_file}")
            print(f"输出文件: {self.output_file}")
            print("-"*50)
            print(f"总记录数: {self.stats['total_records']}")
            print(f"处理记录数: {self.stats['processed_records']}")
            print(f"错误数: {len(self.stats['errors'])}")
            print(f"警告数: {len(self.stats['warnings'])}")
            print("="*50)
            
        except Exception as e:
            logger.error(f"生成处理报告时出错: {str(e)}")
            
    def run_processing(self):
        """运行JSON数据处理"""
        logger.info("开始JSON数据处理...")
        
        try:
            # 1. 备份原始文件
            self.backup_original()
            
            # 2. 加载JSON数据
            data = self.load_json_data()
            
            # 3. 验证Schema
            self.validate_schema(data)
            
            # 4. 处理数据
            processed_data = self.process_data(data)
            
            # 5. 保存结果
            self.save_results(processed_data)
            
            # 6. 生成报告
            self.generate_report()
            
            logger.info("JSON数据处理完成")
            return processed_data
            
        except Exception as e:
            logger.error(f"JSON数据处理过程中发生错误: {str(e)}")
            raise JSONProcessingError(f"处理失败: {str(e)}")
            
    def process_large_file_streaming(self):
        """流式处理大文件"""
        logger.info("开始流式处理大JSON文件...")
        
        try:
            # 自动检测编码
            if self.encoding == 'auto':
                self.encoding = self.detect_encoding(self.input_file)
                
            processed_records = []
            record_count = 0
            
            with open(self.input_file, 'r', encoding=self.encoding) as f:
                # 检测文件格式
                first_char = f.read(1)
                f.seek(0)
                
                if first_char == '[':
                    # JSON数组格式
                    data = json.load(f)
                    if isinstance(data, list):
                        for record in data:
                            if isinstance(record, dict):
                                processed_record = self.process_record(record)
                                processed_records.append(processed_record)
                                record_count += 1
                                
                                # 分批保存避免内存溢出
                                if record_count % 10000 == 0:
                                    logger.info(f"已处理 {record_count} 条记录")
                else:
                    # JSON Lines格式
                    for line_num, line in enumerate(f, 1):
                        if line.strip():
                            try:
                                record = json.loads(line)
                                processed_record = self.process_record(record)
                                processed_records.append(processed_record)
                                record_count += 1
                                
                                # 分批保存避免内存溢出
                                if record_count % 10000 == 0:
                                    logger.info(f"已处理 {record_count} 条记录")
                            except json.JSONDecodeError as e:
                                logger.warning(f"第 {line_num} 行JSON解析失败: {str(e)}")
                                
            self.stats['processed_records'] = record_count
            logger.info(f"流式处理完成,共处理 {record_count} 条记录")
            
            # 保存结果
            self.save_results(processed_records)
            self.generate_report()
            
            return processed_records
            
        except Exception as e:
            logger.error(f"流式处理过程中发生错误: {str(e)}")
            raise JSONProcessingError(f"流式处理失败: {str(e)}")

def create_sample_schema():
    """创建示例Schema文件"""
    sample_schema = {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
            "id": {
                "type": "integer"
            },
            "name": {
                "type": "string"
            },
            "email": {
                "type": "string",
                "format": "email"
            },
            "age": {
                "type": "integer",
                "minimum": 0,
                "maximum": 120
            },
            "active": {
                "type": "boolean"
            }
        },
        "required": ["id", "name", "email"]
    }
    
    with open('sample_schema.json', 'w', encoding='utf-8') as f:
        json.dump(sample_schema, f, indent=2, ensure_ascii=False)
    logger.info("示例Schema文件已创建: sample_schema.json")

def create_sample_config():
    """创建示例配置文件"""
    sample_config = {
        "field_mapping": {
            "user_id": "id",
            "full_name": "name",
            "is_active": "active"
        },
        "type_conversions": {
            "age": "int",
            "salary": "float",
            "active": "bool"
        },
        "cleaning": {
            "remove_empty": True
        }
    }
    
    with open('sample_config.json', 'w', encoding='utf-8') as f:
        json.dump(sample_config, f, indent=2, ensure_ascii=False)
    logger.info("示例配置文件已创建: sample_config.json")

def main():
    parser = argparse.ArgumentParser(description='JSON数据处理和转换工具')
    parser.add_argument('input_file', help='输入JSON文件路径')
    parser.add_argument('-o', '--output', help='输出文件路径')
    parser.add_argument('-e', '--encoding', default='auto', help='文件编码 (auto/utf-8/gbk等)')
    parser.add_argument('-s', '--schema', help='JSON Schema验证文件路径')
    parser.add_argument('-f', '--format', choices=['json', 'csv', 'excel', 'xml'], default='json', help='输出格式')
    parser.add_argument('-c', '--config', help='处理配置文件路径')
    parser.add_argument('--flatten-depth', type=int, default=0, help='扁平化深度 (0表示不限制)')
    parser.add_argument('--streaming', action='store_true', help='流式处理大文件')
    parser.add_argument('--no-backup', action='store_true', help='不备份原始文件')
    parser.add_argument('--sample-schema', action='store_true', help='创建示例Schema文件')
    parser.add_argument('--sample-config', action='store_true', help='创建示例配置文件')
    
    args = parser.parse_args()
    
    if args.sample_schema:
        create_sample_schema()
        return
        
    if args.sample_config:
        create_sample_config()
        return
        
    # 加载配置文件
    options = {}
    if args.config and os.path.exists(args.config):
        try:
            with open(args.config, 'r', encoding='utf-8') as f:
                options = json.load(f)
        except Exception as e:
            logger.error(f"加载配置文件失败: {str(e)}")
            
    # 配置处理参数
    config = {
        'input_file': args.input_file,
        'output_file': args.output or f"processed_{os.path.basename(args.input_file)}",
        'encoding': args.encoding,
        'schema_file': args.schema,
        'output_format': args.format,
        'flatten_depth': args.flatten_depth,
        'backup_original': not args.no_backup,
        'options': options
    }
    
    # 创建处理器实例
    processor = JSONProcessor(config)
    
    try:
        # 执行处理
        if args.streaming:
            processor.process_large_file_streaming()
        else:
            processor.run_processing()
            
    except KeyboardInterrupt:
        logger.info("JSON数据处理被用户中断")
        sys.exit(1)
    except JSONProcessingError as e:
        logger.error(f"JSON处理错误: {str(e)}")
        sys.exit(1)
    except Exception as e:
        logger.error(f"JSON数据处理过程中发生未知错误: {str(e)}")
        sys.exit(1)

if __name__ == '__main__':
    main()

使用说明

1. 基本使用

# 基本JSON处理
python json_processor.py data.json

# 指定输出文件
python json_processor.py data.json -o processed_data.json

# 指定文件编码
python json_processor.py data.json -e utf-8

# 转换为CSV格式
python json_processor.py data.json -f csv -o result.csv

2. 数据验证

# 使用Schema验证
python json_processor.py data.json -s schema.json

# 创建示例Schema文件
python json_processor.py --sample-schema

3. 数据转换

# 转换为Excel格式
python json_processor.py data.json -f excel -o result.xlsx

# 转换为XML格式
python json_processor.py data.json -f xml -o result.xml

# 扁平化嵌套结构
python json_processor.py data.json --flatten-depth 2

4. 配置文件使用

# 使用配置文件
python json_processor.py data.json -c config.json

# 创建示例配置文件
python json_processor.py --sample-config

5. 大文件处理

# 流式处理大文件
python json_processor.py large_data.json --streaming

# 不备份原始文件
python json_processor.py data.json --no-backup

配置文件示例

创建一个名为 config.json 的配置文件:

{
  "field_mapping": {
    "user_id": "id",
    "full_name": "name",
    "email_address": "email",
    "is_active": "active"
  },
  "type_conversions": {
    "age": "int",
    "salary": "float",
    "active": "bool"
  },
  "cleaning": {
    "remove_empty": true
  }
}

Schema文件示例

创建一个名为 schema.json 的Schema验证文件:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "array",
  "items": {
    "type": "object",
    "properties": {
      "id": {
        "type": "integer",
        "minimum": 1
      },
      "name": {
        "type": "string",
        "minLength": 1
      },
      "email": {
        "type": "string",
        "format": "email"
      },
      "age": {
        "type": "integer",
        "minimum": 0,
        "maximum": 120
      },
      "active": {
        "type": "boolean"
      }
    },
    "required": ["id", "name", "email"],
    "additionalProperties": false
  }
}

高级特性

1. 批量处理

可以通过脚本实现批量处理多个文件:

import glob
import os

# 处理目录下所有JSON文件
json_files = glob.glob("data/*.json")

for json_file in json_files:
    config = {
        'input_file': json_file,
        'output_file': f"processed_{os.path.basename(json_file)}",
        'schema_file': 'schema.json',
        'options': {
            'field_mapping': {'old_field': 'new_field'},
            'type_conversions': {'age': 'int'}
        }
    }
    
    processor = JSONProcessor(config)
    processor.run_processing()

2. 自动化调度

可以结合cron实现定期自动处理:

# 每天凌晨3点处理JSON数据
0 3 * * * /usr/bin/python3 /path/to/json_processor.py /path/to/daily_data.json -s /path/to/schema.json

3. API数据处理

可以处理API返回的JSON数据:

import requests

# 获取API数据
response = requests.get('https://api.example.com/data')
data = response.json()

# 保存为临时文件
with open('temp_data.json', 'w') as f:
    json.dump(data, f)

# 处理数据
config = {'input_file': 'temp_data.json'}
processor = JSONProcessor(config)
processor.run_processing()

性能优化

1. 内存管理

2. 处理速度优化

3. 错误处理优化

安全考虑

1. 数据安全

2. 文件安全

3. 系统安全

这个JSON数据处理和转换工具是一个功能强大、安全可靠的数据处理工具,能够帮助用户高效地处理和转换JSON格式的数据,为后续的数据分析和应用提供高质量的数据基础。

到此这篇关于Python数据处理之JSON数据转换和处理详解的文章就介绍到这了,更多相关Python JSON数据处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文