C#教程

关注公众号 jb51net

关闭
首页 > 软件编程 > C#教程 > C# DocumentDB数据同步

C#实现两个DocumentDB实例之间同步数据的解决方案

作者:weixin_30777913

这篇文章主要为大家详细介绍了C#实现两个DocumentDB实例之间同步数据的解决方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

这个程序提供了完整的解决方案,可以安全可靠地在两个DocumentDB实例之间同步数据,并具备企业级应用所需的容错和监控功能。支持断点续传和JSON日志记录:

准备工作

安装驱动

Install-Package MongoDB.Driver -Version 2.19.0

使用 MongoDB .NET Driver 兼容 DocumentDB

下载证书

wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem

DocumentDB 要求 SSL 连接,需 Amazon RDS 根证书

using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace DocumentDBSyncTool
{
    // JSON日志记录类
    public class SyncLogEntry
    {
        [JsonPropertyName("timestamp")]
        public DateTime Timestamp { get; set; }

        [JsonPropertyName("level")]
        public string Level { get; set; } = "INFO";

        [JsonPropertyName("operation")]
        public string Operation { get; set; } = "";

        [JsonPropertyName("source_id")]
        public string? SourceId { get; set; }

        [JsonPropertyName("batch_start_id")]
        public string? BatchStartId { get; set; }

        [JsonPropertyName("batch_end_id")]
        public string? BatchEndId { get; set; }

        [JsonPropertyName("records_processed")]
        public int RecordsProcessed { get; set; }

        [JsonPropertyName("duration_ms")]
        public long DurationMs { get; set; }

        [JsonPropertyName("error")]
        public string? Error { get; set; }

        [JsonPropertyName("message")]
        public string Message { get; set; } = "";

        [JsonPropertyName("checkpoint")]
        public string? Checkpoint { get; set; }
    }

    // 断点续传状态类
    public class CheckpointState
    {
        [JsonPropertyName("last_successful_id")]
        public string LastSuccessfulId { get; set; } = "";

        [JsonPropertyName("last_sync_timestamp")]
        public DateTime LastSyncTimestamp { get; set; }

        [JsonPropertyName("total_processed")]
        public long TotalProcessed { get; set; }

        [JsonPropertyName("last_error")]
        public string? LastError { get; set; }

        [JsonPropertyName("retry_count")]
        public int RetryCount { get; set; }
    }

    // DocumentDB同步器主类
    public class DocumentDBSynchronizer
    {
        private readonly IMongoCollection<BsonDocument> _sourceCollection;
        private readonly IMongoCollection<BsonDocument> _targetCollection;
        private readonly string _checkpointFilePath;
        private readonly string _logFilePath;
        private readonly int _batchSize;
        private readonly int _maxRetries;
        private readonly int _retryDelayMs;
        private CheckpointState _checkpointState;
        private readonly object _logLock = new object();

        public DocumentDBSynchronizer(
            MongoClient sourceClient, 
            MongoClient targetClient,
            string databaseName,
            string collectionName,
            string checkpointFilePath = "checkpoint.json",
            string logFilePath = "sync_log.json",
            int batchSize = 1000,
            int maxRetries = 5,
            int retryDelayMs = 5000)
        {
            var sourceDatabase = sourceClient.GetDatabase(databaseName);
            var targetDatabase = targetClient.GetDatabase(databaseName);
            
            _sourceCollection = sourceDatabase.GetCollection<BsonDocument>(collectionName);
            _targetCollection = targetDatabase.GetCollection<BsonDocument>(collectionName);
            
            _checkpointFilePath = checkpointFilePath;
            _logFilePath = logFilePath;
            _batchSize = batchSize;
            _maxRetries = maxRetries;
            _retryDelayMs = retryDelayMs;
            
            LoadCheckpoint();
        }

        // 加载检查点状态
        private void LoadCheckpoint()
        {
            try
            {
                if (File.Exists(_checkpointFilePath))
                {
                    var json = File.ReadAllText(_checkpointFilePath);
                    _checkpointState = JsonSerializer.Deserialize<CheckpointState>(json) 
                        ?? new CheckpointState();
                    
                    LogSyncInfo($"Loaded checkpoint from {_checkpointFilePath}. Last ID: {_checkpointState.LastSuccessfulId}");
                }
                else
                {
                    _checkpointState = new CheckpointState
                    {
                        LastSuccessfulId = "",
                        LastSyncTimestamp = DateTime.MinValue,
                        TotalProcessed = 0,
                        RetryCount = 0
                    };
                    
                    LogSyncInfo("No checkpoint found. Starting from beginning.");
                }
            }
            catch (Exception ex)
            {
                LogSyncError("Failed to load checkpoint", ex);
                _checkpointState = new CheckpointState();
            }
        }

        // 保存检查点状态
        private void SaveCheckpoint(string lastId)
        {
            try
            {
                _checkpointState.LastSuccessfulId = lastId;
                _checkpointState.LastSyncTimestamp = DateTime.UtcNow;
                _checkpointState.LastError = null;
                _checkpointState.RetryCount = 0;

                var json = JsonSerializer.Serialize(_checkpointState, new JsonSerializerOptions
                {
                    WriteIndented = true
                });
                
                File.WriteAllText(_checkpointFilePath, json);
                LogSyncInfo($"Checkpoint saved: {lastId}");
            }
            catch (Exception ex)
            {
                LogSyncError("Failed to save checkpoint", ex);
            }
        }

        // 记录同步信息日志
        private void LogSyncInfo(string message, string? operation = null, 
            string? sourceId = null, int recordsProcessed = 0, 
            long durationMs = 0, string? batchStartId = null, 
            string? batchEndId = null)
        {
            var logEntry = new SyncLogEntry
            {
                Timestamp = DateTime.UtcNow,
                Level = "INFO",
                Operation = operation ?? "SYNC",
                SourceId = sourceId,
                BatchStartId = batchStartId,
                BatchEndId = batchEndId,
                RecordsProcessed = recordsProcessed,
                DurationMs = durationMs,
                Message = message,
                Checkpoint = _checkpointState.LastSuccessfulId
            };

            AppendLog(logEntry);
        }

        // 记录错误日志
        private void LogSyncError(string message, Exception? ex = null, 
            string? operation = null, string? sourceId = null)
        {
            var logEntry = new SyncLogEntry
            {
                Timestamp = DateTime.UtcNow,
                Level = "ERROR",
                Operation = operation ?? "SYNC",
                SourceId = sourceId,
                Message = message,
                Error = ex?.ToString(),
                Checkpoint = _checkpointState.LastSuccessfulId
            };

            AppendLog(logEntry);

            // 更新检查点中的错误信息
            _checkpointState.LastError = $"{message}: {ex?.Message}";
            _checkpointState.RetryCount++;
        }

        // 追加日志到文件
        private void AppendLog(SyncLogEntry logEntry)
        {
            lock (_logLock)
            {
                try
                {
                    var logLine = JsonSerializer.Serialize(logEntry);
                    
                    // 使用追加模式写入日志
                    using var writer = new StreamWriter(_logFilePath, true);
                    writer.WriteLine(logLine);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Failed to write log: {ex.Message}");
                }
            }
        }

        // 获取下一批数据
        private async Task<List<BsonDocument>> GetNextBatchAsync(string lastId, int retryCount = 0)
        {
            var stopwatch = System.Diagnostics.Stopwatch.StartNew();
            
            try
            {
                var filter = string.IsNullOrEmpty(lastId)
                    ? Builders<BsonDocument>.Filter.Empty
                    : Builders<BsonDocument>.Filter.Gt("_id", lastId);

                var sort = Builders<BsonDocument>.Sort.Ascending("_id");
                
                var batch = await _sourceCollection
                    .Find(filter)
                    .Sort(sort)
                    .Limit(_batchSize)
                    .ToListAsync();

                stopwatch.Stop();
                
                LogSyncInfo($"Fetched {batch.Count} records from source",
                    operation: "FETCH",
                    batchStartId: batch.FirstOrDefault()?["_id"].ToString(),
                    batchEndId: batch.LastOrDefault()?["_id"].ToString(),
                    recordsProcessed: batch.Count,
                    durationMs: stopwatch.ElapsedMilliseconds);

                return batch;
            }
            catch (Exception ex) when (retryCount < _maxRetries)
            {
                stopwatch.Stop();
                LogSyncError($"Failed to fetch batch (attempt {retryCount + 1}/{_maxRetries})", 
                    ex, "FETCH");
                
                await Task.Delay(_retryDelayMs * (retryCount + 1));
                return await GetNextBatchAsync(lastId, retryCount + 1);
            }
            catch (Exception ex)
            {
                stopwatch.Stop();
                throw new InvalidOperationException($"Failed to fetch batch after {_maxRetries} attempts", ex);
            }
        }

        // 批量写入目标数据库
        private async Task WriteBatchAsync(List<BsonDocument> batch, int retryCount = 0)
        {
            var stopwatch = System.Diagnostics.Stopwatch.StartNew();
            
            if (batch.Count == 0)
                return;

            try
            {
                var operations = new List<WriteModel<BsonDocument>>();
                
                foreach (var document in batch)
                {
                    var filter = Builders<BsonDocument>.Filter.Eq("_id", document["_id"]);
                    var update = new ReplaceOneModel<BsonDocument>(filter, document)
                    {
                        IsUpsert = true
                    };
                    operations.Add(update);
                }

                var result = await _targetCollection.BulkWriteAsync(operations, 
                    new BulkWriteOptions { IsOrdered = false });

                stopwatch.Stop();
                
                LogSyncInfo($"Written {result.Upserts.Count + result.ModifiedCount} records to target",
                    operation: "WRITE",
                    batchStartId: batch.First()["_id"].ToString(),
                    batchEndId: batch.Last()["_id"].ToString(),
                    recordsProcessed: batch.Count,
                    durationMs: stopwatch.ElapsedMilliseconds);
            }
            catch (Exception ex) when (retryCount < _maxRetries)
            {
                stopwatch.Stop();
                LogSyncError($"Failed to write batch (attempt {retryCount + 1}/{_maxRetries})", 
                    ex, "WRITE");
                
                await Task.Delay(_retryDelayMs * (retryCount + 1));
                await WriteBatchAsync(batch, retryCount + 1);
            }
            catch (Exception ex)
            {
                stopwatch.Stop();
                throw new InvalidOperationException($"Failed to write batch after {_maxRetries} attempts", ex);
            }
        }

        // 验证连接
        private async Task<bool> ValidateConnectionsAsync()
        {
            try
            {
                LogSyncInfo("Validating source connection...", "VALIDATE");
                var sourceCount = await _sourceCollection.EstimatedDocumentCountAsync();
                
                LogSyncInfo("Validating target connection...", "VALIDATE");
                var targetCount = await _targetCollection.EstimatedDocumentCountAsync();
                
                LogSyncInfo($"Connections validated. Source: ~{sourceCount} records, Target: ~{targetCount} records", 
                    "VALIDATE");
                return true;
            }
            catch (Exception ex)
            {
                LogSyncError("Connection validation failed", ex, "VALIDATE");
                return false;
            }
        }

        // 执行同步
        public async Task<bool> StartSyncAsync(CancellationToken cancellationToken = default)
        {
            LogSyncInfo("Starting synchronization process", "START");
            
            if (!await ValidateConnectionsAsync())
            {
                LogSyncError("Connection validation failed. Aborting sync.", null, "START");
                return false;
            }

            try
            {
                bool hasMoreData = true;
                long totalProcessed = 0;
                string lastProcessedId = _checkpointState.LastSuccessfulId;

                while (hasMoreData && !cancellationToken.IsCancellationRequested)
                {
                    var batch = await GetNextBatchAsync(lastProcessedId);
                    
                    if (batch.Count == 0)
                    {
                        hasMoreData = false;
                        LogSyncInfo("No more data to sync", "COMPLETE", 
                            recordsProcessed: (int)totalProcessed);
                        break;
                    }

                    await WriteBatchAsync(batch);
                    
                    // 更新检查点
                    lastProcessedId = batch.Last()["_id"].ToString();
                    SaveCheckpoint(lastProcessedId);
                    
                    totalProcessed += batch.Count;
                    _checkpointState.TotalProcessed = totalProcessed;

                    // 避免过度占用资源
                    if (batch.Count == _batchSize)
                    {
                        await Task.Delay(100, cancellationToken);
                    }
                }

                if (cancellationToken.IsCancellationRequested)
                {
                    LogSyncInfo("Sync cancelled by user", "CANCELLED", 
                        recordsProcessed: (int)totalProcessed);
                    return false;
                }

                LogSyncInfo($"Sync completed successfully. Total processed: {totalProcessed}", 
                    "COMPLETE", recordsProcessed: (int)totalProcessed);
                return true;
            }
            catch (Exception ex)
            {
                LogSyncError("Sync failed", ex, "ERROR");
                return false;
            }
        }

        // 获取同步统计信息
        public async Task<SyncStatistics> GetStatisticsAsync()
        {
            try
            {
                var sourceCount = await _sourceCollection.EstimatedDocumentCountAsync();
                var targetCount = await _targetCollection.EstimatedDocumentCountAsync();
                
                return new SyncStatistics
                {
                    SourceCount = sourceCount,
                    TargetCount = targetCount,
                    LastCheckpoint = _checkpointState.LastSuccessfulId,
                    TotalProcessed = _checkpointState.TotalProcessed,
                    LastSyncTime = _checkpointState.LastSyncTimestamp
                };
            }
            catch (Exception ex)
            {
                LogSyncError("Failed to get statistics", ex, "STATS");
                return new SyncStatistics();
            }
        }
    }

    // 统计信息类
    public class SyncStatistics
    {
        [JsonPropertyName("source_count")]
        public long SourceCount { get; set; }
        
        [JsonPropertyName("target_count")]
        public long TargetCount { get; set; }
        
        [JsonPropertyName("last_checkpoint")]
        public string LastCheckpoint { get; set; } = "";
        
        [JsonPropertyName("total_processed")]
        public long TotalProcessed { get; set; }
        
        [JsonPropertyName("last_sync_time")]
        public DateTime LastSyncTime { get; set; }
        
        [JsonPropertyName("sync_status")]
        public string SyncStatus => SourceCount == TargetCount ? "In Sync" : "Out of Sync";
        
        [JsonPropertyName("records_difference")]
        public long RecordsDifference => Math.Abs(SourceCount - TargetCount);
    }

    // DocumentDB连接工厂
    public static class DocumentDBConnectionFactory
    {
        public static MongoClient CreateClient(string username, string password, 
            string host, string databaseName, bool allowInsecureTls = false)
        {
            var settings = new MongoClientSettings
            {
                Servers = new[] { MongoServerAddress.Parse(host) },
                Credential = MongoCredential.CreateCredential(
                    databaseName,
                    username,
                    password
                ),
                ConnectionMode = ConnectionMode.ReplicaSet,
                ReadPreference = ReadPreference.SecondaryPreferred,
                SslSettings = new SslSettings
                {
                    EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12
                },
                UseTls = true,
                ServerSelectionTimeout = TimeSpan.FromSeconds(30),
                MaxConnectionPoolSize = 100,
                MinConnectionPoolSize = 5,
                SocketTimeout = TimeSpan.FromSeconds(60),
                ConnectTimeout = TimeSpan.FromSeconds(30)
            };

            if (allowInsecureTls)
            {
                settings.AllowInsecureTls = true;
            }
            else
            {
                // 生产环境中应该使用正确的证书验证
                // settings.SslSettings.ClientCertificates = LoadCertificates();
            }

            return new MongoClient(settings);
        }
    }

    // 主程序
    class Program
    {
        static async Task Main(string[] args)
        {
            // 配置参数 - 可以从配置文件或环境变量读取
            var sourceConfig = new
            {
                Username = "admin",
                Password = "source_password",
                Host = "docdb-source.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
                Database = "testdb"
            };

            var targetConfig = new
            {
                Username = "admin",
                Password = "target_password",
                Host = "docdb-target.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
                Database = "testdb"
            };

            var collectionName = "users";
            var checkpointFile = "sync_checkpoint.json";
            var logFile = $"sync_log_{DateTime.Now:yyyyMMdd_HHmmss}.json";
            var batchSize = 500; // 根据网络状况调整

            Console.WriteLine("Starting DocumentDB Sync Tool...");
            Console.WriteLine($"Log file: {logFile}");
            Console.WriteLine($"Checkpoint file: {checkpointFile}");

            try
            {
                // 创建连接
                var sourceClient = DocumentDBConnectionFactory.CreateClient(
                    sourceConfig.Username,
                    sourceConfig.Password,
                    sourceConfig.Host,
                    sourceConfig.Database,
                    allowInsecureTls: true // 开发环境,生产环境应使用证书
                );

                var targetClient = DocumentDBConnectionFactory.CreateClient(
                    targetConfig.Username,
                    targetConfig.Password,
                    targetConfig.Host,
                    targetConfig.Database,
                    allowInsecureTls: true
                );

                // 创建同步器
                var synchronizer = new DocumentDBSynchronizer(
                    sourceClient,
                    targetClient,
                    sourceConfig.Database,
                    collectionName,
                    checkpointFilePath: checkpointFile,
                    logFilePath: logFile,
                    batchSize: batchSize,
                    maxRetries: 3,
                    retryDelayMs: 3000
                );

                // 设置取消令牌(支持CTRL+C中断)
                var cts = new CancellationTokenSource();
                Console.CancelKeyPress += (sender, e) =>
                {
                    Console.WriteLine("\nCancelling sync...");
                    cts.Cancel();
                    e.Cancel = true;
                };

                // 执行同步
                var success = await synchronizer.StartSyncAsync(cts.Token);

                // 显示统计信息
                var stats = await synchronizer.GetStatisticsAsync();
                Console.WriteLine("\n=== Sync Statistics ===");
                Console.WriteLine($"Source records: {stats.SourceCount}");
                Console.WriteLine($"Target records: {stats.TargetCount}");
                Console.WriteLine($"Last checkpoint: {stats.LastCheckpoint}");
                Console.WriteLine($"Total processed: {stats.TotalProcessed}");
                Console.WriteLine($"Sync status: {stats.SyncStatus}");
                Console.WriteLine($"Difference: {stats.RecordsDifference}");

                if (success)
                {
                    Console.WriteLine("\n✅ Sync completed successfully!");
                }
                else
                {
                    Console.WriteLine("\n❌ Sync completed with errors.");
                    Console.WriteLine("Check the log file for details.");
                }

                Console.WriteLine($"\nLog saved to: {logFile}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"\n❌ Fatal error: {ex.Message}");
                Console.WriteLine($"Stack trace: {ex.StackTrace}");
            }
        }
    }
}

配置文件示例 (appsettings.json)

{
  "SyncConfig": {
    "SourceDb": {
      "Username": "admin",
      "Password": "source_password",
      "Host": "docdb-source.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
      "Database": "testdb",
      "Collection": "users"
    },
    "TargetDb": {
      "Username": "admin",
      "Password": "target_password",
      "Host": "docdb-target.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
      "Database": "testdb",
      "Collection": "users"
    },
    "SyncOptions": {
      "BatchSize": 1000,
      "MaxRetries": 5,
      "RetryDelayMs": 5000,
      "CheckpointFile": "checkpoint.json",
      "LogDirectory": "logs",
      "EnableValidation": true
    }
  }
}

项目文件 (DocumentDBSyncTool.csproj)

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net8.0</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="MongoDB.Driver" Version="2.19.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
    <PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
  </ItemGroup>

  <ItemGroup>
    <None Update="appsettings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
  </ItemGroup>

</Project>

使用说明

1.编译和运行

dotnet build
dotnet run

2.带参数运行

dotnet run -- --source-host docdb-source.amazonaws.com --target-host docdb-target.amazonaws.com

3.特性说明

断点续传功能

容错机制

JSON日志记录

性能优化

监控和统计

4.生产环境建议

安全性:

监控:

性能:

部署:

到此这篇关于C#实现两个DocumentDB实例之间同步数据的解决方案的文章就介绍到这了,更多相关C# DocumentDB数据同步内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文