C#实现两个DocumentDB实例之间同步数据的解决方案
作者:weixin_30777913
这篇文章主要为大家详细介绍了C#实现两个DocumentDB实例之间同步数据的解决方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
这个程序提供了完整的解决方案,可以安全可靠地在两个DocumentDB实例之间同步数据,并具备企业级应用所需的容错和监控功能。支持断点续传和JSON日志记录:
准备工作
安装驱动
Install-Package MongoDB.Driver -Version 2.19.0
使用 MongoDB .NET Driver 兼容 DocumentDB
下载证书
wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
DocumentDB 要求 SSL 连接,需 Amazon RDS 根证书
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
namespace DocumentDBSyncTool
{
// JSON日志记录类
public class SyncLogEntry
{
[JsonPropertyName("timestamp")]
public DateTime Timestamp { get; set; }
[JsonPropertyName("level")]
public string Level { get; set; } = "INFO";
[JsonPropertyName("operation")]
public string Operation { get; set; } = "";
[JsonPropertyName("source_id")]
public string? SourceId { get; set; }
[JsonPropertyName("batch_start_id")]
public string? BatchStartId { get; set; }
[JsonPropertyName("batch_end_id")]
public string? BatchEndId { get; set; }
[JsonPropertyName("records_processed")]
public int RecordsProcessed { get; set; }
[JsonPropertyName("duration_ms")]
public long DurationMs { get; set; }
[JsonPropertyName("error")]
public string? Error { get; set; }
[JsonPropertyName("message")]
public string Message { get; set; } = "";
[JsonPropertyName("checkpoint")]
public string? Checkpoint { get; set; }
}
// 断点续传状态类
public class CheckpointState
{
[JsonPropertyName("last_successful_id")]
public string LastSuccessfulId { get; set; } = "";
[JsonPropertyName("last_sync_timestamp")]
public DateTime LastSyncTimestamp { get; set; }
[JsonPropertyName("total_processed")]
public long TotalProcessed { get; set; }
[JsonPropertyName("last_error")]
public string? LastError { get; set; }
[JsonPropertyName("retry_count")]
public int RetryCount { get; set; }
}
// DocumentDB同步器主类
public class DocumentDBSynchronizer
{
private readonly IMongoCollection<BsonDocument> _sourceCollection;
private readonly IMongoCollection<BsonDocument> _targetCollection;
private readonly string _checkpointFilePath;
private readonly string _logFilePath;
private readonly int _batchSize;
private readonly int _maxRetries;
private readonly int _retryDelayMs;
private CheckpointState _checkpointState;
private readonly object _logLock = new object();
public DocumentDBSynchronizer(
MongoClient sourceClient,
MongoClient targetClient,
string databaseName,
string collectionName,
string checkpointFilePath = "checkpoint.json",
string logFilePath = "sync_log.json",
int batchSize = 1000,
int maxRetries = 5,
int retryDelayMs = 5000)
{
var sourceDatabase = sourceClient.GetDatabase(databaseName);
var targetDatabase = targetClient.GetDatabase(databaseName);
_sourceCollection = sourceDatabase.GetCollection<BsonDocument>(collectionName);
_targetCollection = targetDatabase.GetCollection<BsonDocument>(collectionName);
_checkpointFilePath = checkpointFilePath;
_logFilePath = logFilePath;
_batchSize = batchSize;
_maxRetries = maxRetries;
_retryDelayMs = retryDelayMs;
LoadCheckpoint();
}
// 加载检查点状态
private void LoadCheckpoint()
{
try
{
if (File.Exists(_checkpointFilePath))
{
var json = File.ReadAllText(_checkpointFilePath);
_checkpointState = JsonSerializer.Deserialize<CheckpointState>(json)
?? new CheckpointState();
LogSyncInfo($"Loaded checkpoint from {_checkpointFilePath}. Last ID: {_checkpointState.LastSuccessfulId}");
}
else
{
_checkpointState = new CheckpointState
{
LastSuccessfulId = "",
LastSyncTimestamp = DateTime.MinValue,
TotalProcessed = 0,
RetryCount = 0
};
LogSyncInfo("No checkpoint found. Starting from beginning.");
}
}
catch (Exception ex)
{
LogSyncError("Failed to load checkpoint", ex);
_checkpointState = new CheckpointState();
}
}
// 保存检查点状态
private void SaveCheckpoint(string lastId)
{
try
{
_checkpointState.LastSuccessfulId = lastId;
_checkpointState.LastSyncTimestamp = DateTime.UtcNow;
_checkpointState.LastError = null;
_checkpointState.RetryCount = 0;
var json = JsonSerializer.Serialize(_checkpointState, new JsonSerializerOptions
{
WriteIndented = true
});
File.WriteAllText(_checkpointFilePath, json);
LogSyncInfo($"Checkpoint saved: {lastId}");
}
catch (Exception ex)
{
LogSyncError("Failed to save checkpoint", ex);
}
}
// 记录同步信息日志
private void LogSyncInfo(string message, string? operation = null,
string? sourceId = null, int recordsProcessed = 0,
long durationMs = 0, string? batchStartId = null,
string? batchEndId = null)
{
var logEntry = new SyncLogEntry
{
Timestamp = DateTime.UtcNow,
Level = "INFO",
Operation = operation ?? "SYNC",
SourceId = sourceId,
BatchStartId = batchStartId,
BatchEndId = batchEndId,
RecordsProcessed = recordsProcessed,
DurationMs = durationMs,
Message = message,
Checkpoint = _checkpointState.LastSuccessfulId
};
AppendLog(logEntry);
}
// 记录错误日志
private void LogSyncError(string message, Exception? ex = null,
string? operation = null, string? sourceId = null)
{
var logEntry = new SyncLogEntry
{
Timestamp = DateTime.UtcNow,
Level = "ERROR",
Operation = operation ?? "SYNC",
SourceId = sourceId,
Message = message,
Error = ex?.ToString(),
Checkpoint = _checkpointState.LastSuccessfulId
};
AppendLog(logEntry);
// 更新检查点中的错误信息
_checkpointState.LastError = $"{message}: {ex?.Message}";
_checkpointState.RetryCount++;
}
// 追加日志到文件
private void AppendLog(SyncLogEntry logEntry)
{
lock (_logLock)
{
try
{
var logLine = JsonSerializer.Serialize(logEntry);
// 使用追加模式写入日志
using var writer = new StreamWriter(_logFilePath, true);
writer.WriteLine(logLine);
}
catch (Exception ex)
{
Console.WriteLine($"Failed to write log: {ex.Message}");
}
}
}
// 获取下一批数据
private async Task<List<BsonDocument>> GetNextBatchAsync(string lastId, int retryCount = 0)
{
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
try
{
var filter = string.IsNullOrEmpty(lastId)
? Builders<BsonDocument>.Filter.Empty
: Builders<BsonDocument>.Filter.Gt("_id", lastId);
var sort = Builders<BsonDocument>.Sort.Ascending("_id");
var batch = await _sourceCollection
.Find(filter)
.Sort(sort)
.Limit(_batchSize)
.ToListAsync();
stopwatch.Stop();
LogSyncInfo($"Fetched {batch.Count} records from source",
operation: "FETCH",
batchStartId: batch.FirstOrDefault()?["_id"].ToString(),
batchEndId: batch.LastOrDefault()?["_id"].ToString(),
recordsProcessed: batch.Count,
durationMs: stopwatch.ElapsedMilliseconds);
return batch;
}
catch (Exception ex) when (retryCount < _maxRetries)
{
stopwatch.Stop();
LogSyncError($"Failed to fetch batch (attempt {retryCount + 1}/{_maxRetries})",
ex, "FETCH");
await Task.Delay(_retryDelayMs * (retryCount + 1));
return await GetNextBatchAsync(lastId, retryCount + 1);
}
catch (Exception ex)
{
stopwatch.Stop();
throw new InvalidOperationException($"Failed to fetch batch after {_maxRetries} attempts", ex);
}
}
// 批量写入目标数据库
private async Task WriteBatchAsync(List<BsonDocument> batch, int retryCount = 0)
{
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
if (batch.Count == 0)
return;
try
{
var operations = new List<WriteModel<BsonDocument>>();
foreach (var document in batch)
{
var filter = Builders<BsonDocument>.Filter.Eq("_id", document["_id"]);
var update = new ReplaceOneModel<BsonDocument>(filter, document)
{
IsUpsert = true
};
operations.Add(update);
}
var result = await _targetCollection.BulkWriteAsync(operations,
new BulkWriteOptions { IsOrdered = false });
stopwatch.Stop();
LogSyncInfo($"Written {result.Upserts.Count + result.ModifiedCount} records to target",
operation: "WRITE",
batchStartId: batch.First()["_id"].ToString(),
batchEndId: batch.Last()["_id"].ToString(),
recordsProcessed: batch.Count,
durationMs: stopwatch.ElapsedMilliseconds);
}
catch (Exception ex) when (retryCount < _maxRetries)
{
stopwatch.Stop();
LogSyncError($"Failed to write batch (attempt {retryCount + 1}/{_maxRetries})",
ex, "WRITE");
await Task.Delay(_retryDelayMs * (retryCount + 1));
await WriteBatchAsync(batch, retryCount + 1);
}
catch (Exception ex)
{
stopwatch.Stop();
throw new InvalidOperationException($"Failed to write batch after {_maxRetries} attempts", ex);
}
}
// 验证连接
private async Task<bool> ValidateConnectionsAsync()
{
try
{
LogSyncInfo("Validating source connection...", "VALIDATE");
var sourceCount = await _sourceCollection.EstimatedDocumentCountAsync();
LogSyncInfo("Validating target connection...", "VALIDATE");
var targetCount = await _targetCollection.EstimatedDocumentCountAsync();
LogSyncInfo($"Connections validated. Source: ~{sourceCount} records, Target: ~{targetCount} records",
"VALIDATE");
return true;
}
catch (Exception ex)
{
LogSyncError("Connection validation failed", ex, "VALIDATE");
return false;
}
}
// 执行同步
public async Task<bool> StartSyncAsync(CancellationToken cancellationToken = default)
{
LogSyncInfo("Starting synchronization process", "START");
if (!await ValidateConnectionsAsync())
{
LogSyncError("Connection validation failed. Aborting sync.", null, "START");
return false;
}
try
{
bool hasMoreData = true;
long totalProcessed = 0;
string lastProcessedId = _checkpointState.LastSuccessfulId;
while (hasMoreData && !cancellationToken.IsCancellationRequested)
{
var batch = await GetNextBatchAsync(lastProcessedId);
if (batch.Count == 0)
{
hasMoreData = false;
LogSyncInfo("No more data to sync", "COMPLETE",
recordsProcessed: (int)totalProcessed);
break;
}
await WriteBatchAsync(batch);
// 更新检查点
lastProcessedId = batch.Last()["_id"].ToString();
SaveCheckpoint(lastProcessedId);
totalProcessed += batch.Count;
_checkpointState.TotalProcessed = totalProcessed;
// 避免过度占用资源
if (batch.Count == _batchSize)
{
await Task.Delay(100, cancellationToken);
}
}
if (cancellationToken.IsCancellationRequested)
{
LogSyncInfo("Sync cancelled by user", "CANCELLED",
recordsProcessed: (int)totalProcessed);
return false;
}
LogSyncInfo($"Sync completed successfully. Total processed: {totalProcessed}",
"COMPLETE", recordsProcessed: (int)totalProcessed);
return true;
}
catch (Exception ex)
{
LogSyncError("Sync failed", ex, "ERROR");
return false;
}
}
// 获取同步统计信息
public async Task<SyncStatistics> GetStatisticsAsync()
{
try
{
var sourceCount = await _sourceCollection.EstimatedDocumentCountAsync();
var targetCount = await _targetCollection.EstimatedDocumentCountAsync();
return new SyncStatistics
{
SourceCount = sourceCount,
TargetCount = targetCount,
LastCheckpoint = _checkpointState.LastSuccessfulId,
TotalProcessed = _checkpointState.TotalProcessed,
LastSyncTime = _checkpointState.LastSyncTimestamp
};
}
catch (Exception ex)
{
LogSyncError("Failed to get statistics", ex, "STATS");
return new SyncStatistics();
}
}
}
// 统计信息类
public class SyncStatistics
{
[JsonPropertyName("source_count")]
public long SourceCount { get; set; }
[JsonPropertyName("target_count")]
public long TargetCount { get; set; }
[JsonPropertyName("last_checkpoint")]
public string LastCheckpoint { get; set; } = "";
[JsonPropertyName("total_processed")]
public long TotalProcessed { get; set; }
[JsonPropertyName("last_sync_time")]
public DateTime LastSyncTime { get; set; }
[JsonPropertyName("sync_status")]
public string SyncStatus => SourceCount == TargetCount ? "In Sync" : "Out of Sync";
[JsonPropertyName("records_difference")]
public long RecordsDifference => Math.Abs(SourceCount - TargetCount);
}
// DocumentDB连接工厂
public static class DocumentDBConnectionFactory
{
public static MongoClient CreateClient(string username, string password,
string host, string databaseName, bool allowInsecureTls = false)
{
var settings = new MongoClientSettings
{
Servers = new[] { MongoServerAddress.Parse(host) },
Credential = MongoCredential.CreateCredential(
databaseName,
username,
password
),
ConnectionMode = ConnectionMode.ReplicaSet,
ReadPreference = ReadPreference.SecondaryPreferred,
SslSettings = new SslSettings
{
EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12
},
UseTls = true,
ServerSelectionTimeout = TimeSpan.FromSeconds(30),
MaxConnectionPoolSize = 100,
MinConnectionPoolSize = 5,
SocketTimeout = TimeSpan.FromSeconds(60),
ConnectTimeout = TimeSpan.FromSeconds(30)
};
if (allowInsecureTls)
{
settings.AllowInsecureTls = true;
}
else
{
// 生产环境中应该使用正确的证书验证
// settings.SslSettings.ClientCertificates = LoadCertificates();
}
return new MongoClient(settings);
}
}
// 主程序
class Program
{
static async Task Main(string[] args)
{
// 配置参数 - 可以从配置文件或环境变量读取
var sourceConfig = new
{
Username = "admin",
Password = "source_password",
Host = "docdb-source.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
Database = "testdb"
};
var targetConfig = new
{
Username = "admin",
Password = "target_password",
Host = "docdb-target.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
Database = "testdb"
};
var collectionName = "users";
var checkpointFile = "sync_checkpoint.json";
var logFile = $"sync_log_{DateTime.Now:yyyyMMdd_HHmmss}.json";
var batchSize = 500; // 根据网络状况调整
Console.WriteLine("Starting DocumentDB Sync Tool...");
Console.WriteLine($"Log file: {logFile}");
Console.WriteLine($"Checkpoint file: {checkpointFile}");
try
{
// 创建连接
var sourceClient = DocumentDBConnectionFactory.CreateClient(
sourceConfig.Username,
sourceConfig.Password,
sourceConfig.Host,
sourceConfig.Database,
allowInsecureTls: true // 开发环境,生产环境应使用证书
);
var targetClient = DocumentDBConnectionFactory.CreateClient(
targetConfig.Username,
targetConfig.Password,
targetConfig.Host,
targetConfig.Database,
allowInsecureTls: true
);
// 创建同步器
var synchronizer = new DocumentDBSynchronizer(
sourceClient,
targetClient,
sourceConfig.Database,
collectionName,
checkpointFilePath: checkpointFile,
logFilePath: logFile,
batchSize: batchSize,
maxRetries: 3,
retryDelayMs: 3000
);
// 设置取消令牌(支持CTRL+C中断)
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, e) =>
{
Console.WriteLine("\nCancelling sync...");
cts.Cancel();
e.Cancel = true;
};
// 执行同步
var success = await synchronizer.StartSyncAsync(cts.Token);
// 显示统计信息
var stats = await synchronizer.GetStatisticsAsync();
Console.WriteLine("\n=== Sync Statistics ===");
Console.WriteLine($"Source records: {stats.SourceCount}");
Console.WriteLine($"Target records: {stats.TargetCount}");
Console.WriteLine($"Last checkpoint: {stats.LastCheckpoint}");
Console.WriteLine($"Total processed: {stats.TotalProcessed}");
Console.WriteLine($"Sync status: {stats.SyncStatus}");
Console.WriteLine($"Difference: {stats.RecordsDifference}");
if (success)
{
Console.WriteLine("\n✅ Sync completed successfully!");
}
else
{
Console.WriteLine("\n❌ Sync completed with errors.");
Console.WriteLine("Check the log file for details.");
}
Console.WriteLine($"\nLog saved to: {logFile}");
}
catch (Exception ex)
{
Console.WriteLine($"\n❌ Fatal error: {ex.Message}");
Console.WriteLine($"Stack trace: {ex.StackTrace}");
}
}
}
}
配置文件示例 (appsettings.json)
{
"SyncConfig": {
"SourceDb": {
"Username": "admin",
"Password": "source_password",
"Host": "docdb-source.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
"Database": "testdb",
"Collection": "users"
},
"TargetDb": {
"Username": "admin",
"Password": "target_password",
"Host": "docdb-target.cluster-xxxxxx.us-east-1.docdb.amazonaws.com",
"Database": "testdb",
"Collection": "users"
},
"SyncOptions": {
"BatchSize": 1000,
"MaxRetries": 5,
"RetryDelayMs": 5000,
"CheckpointFile": "checkpoint.json",
"LogDirectory": "logs",
"EnableValidation": true
}
}
}
项目文件 (DocumentDBSyncTool.csproj)
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.19.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
使用说明
1.编译和运行
dotnet build dotnet run
2.带参数运行
dotnet run -- --source-host docdb-source.amazonaws.com --target-host docdb-target.amazonaws.com
3.特性说明
断点续传功能
- 自动保存检查点(checkpoint.json)
- 程序重启后从上次成功的位置继续
- 记录每次同步的最后一个ID
容错机制
- 自动重试失败的批次(可配置重试次数)
- 指数退避重试延迟
- 网络中断恢复
JSON日志记录
- 结构化日志,便于分析
- 包含时间戳、操作类型、记录数量等信息
- 错误信息详细记录
性能优化
- 批量读取和写入
- 连接池管理
- 异步操作,支持并发
监控和统计
- 实时显示同步进度
- 生成统计报告
- 验证源和目标记录数量
4.生产环境建议
安全性:
- 使用AWS Secrets Manager存储密码
- 启用TLS证书验证
- 限制网络访问权限
监控:
- 集成CloudWatch监控
- 设置报警规则
- 定期审计日志
性能:
- 根据网络延迟调整批次大小
- 监控DocumentDB实例性能指标
- 考虑使用分片集合处理大量数据
部署:
- 作为Windows服务或Linux守护进程运行
- 使用AWS ECS或Lambda进行无服务器部署
- 配置自动重启策略
到此这篇关于C#实现两个DocumentDB实例之间同步数据的解决方案的文章就介绍到这了,更多相关C# DocumentDB数据同步内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
