Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go语言 分布式事务

Go语言的分布式事务处理

作者:码龙大大

本文介绍了分布式系统中事务处理面临的挑战,包括网络延迟、节点故障等问题,并详细探索了常见的分布式事务解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. 分布式事务简介

在分布式系统中,事务处理变得更加复杂。传统的单机事务可以通过数据库的ACID特性来保证一致性,但在分布式环境中,由于网络延迟、节点故障等因素,确保多个服务之间的数据一致性成为一个挑战。

分布式事务的特点

分布式事务的挑战

2. 常见的分布式事务解决方案

2PC(两阶段提交)

2PC是一种经典的分布式事务协议,它将事务分为准备阶段和提交阶段。

原理

  1. 准备阶段:协调者向所有参与者发送准备请求,参与者执行操作但不提交
  2. 提交阶段:如果所有参与者都准备成功,协调者发送提交请求;否则发送回滚请求

特点

3PC(三阶段提交)

3PC是对2PC的改进,增加了一个准备提交阶段,减少了阻塞时间。

原理

  1. CanCommit:协调者询问参与者是否可以执行事务
  2. PreCommit:协调者发送预提交请求,参与者执行操作但不提交
  3. DoCommit:协调者发送提交请求,参与者提交事务

特点

TCC(Try-Confirm-Cancel)

TCC是一种基于业务逻辑的分布式事务解决方案,它将事务分为三个阶段。

原理

  1. Try:尝试执行业务操作,预留资源
  2. Confirm:确认执行业务操作,使用预留的资源
  3. Cancel:取消执行业务操作,释放预留的资源

特点

Saga模式

Saga模式是一种基于事件的分布式事务解决方案,它将长事务分解为多个短事务。

原理

  1. 定义一系列本地事务
  2. 每个本地事务都有对应的补偿操作
  3. 按顺序执行本地事务,如果某个事务失败,执行补偿操作

特点

本地消息表

本地消息表是一种基于消息队列的最终一致性解决方案。

原理

  1. 业务操作和消息写入同一个本地事务
  2. 消息队列消费消息,执行远程操作
  3. 如果远程操作失败,通过重试机制确保最终执行

特点

基于消息队列的最终一致性

直接使用消息队列来实现最终一致性,适用于对一致性要求不高的场景。

原理

  1. 生产者发送消息到消息队列
  2. 消费者消费消息并执行操作
  3. 通过重试机制确保消息被处理

特点

3. 2PC协议的Go语言实现

基本架构

package main

import (
	"errors"
	"fmt"
	"sync"
)

// Participant 事务参与者
type Participant interface {
	Prepare() error
	Commit() error
	Rollback() error
}

// Coordinator 事务协调者
type Coordinator struct {
	participants []Participant
}

// NewCoordinator 创建新的协调者
func NewCoordinator(participants []Participant) *Coordinator {
	return &Coordinator{
		participants: participants,
	}
}

// Execute 执行两阶段提交
func (c *Coordinator) Execute() error {
	// 第一阶段:准备
	if err := c.prepare(); err != nil {
		// 准备失败,执行回滚
		c.rollback()
		return err
	}

	// 第二阶段:提交
	if err := c.commit(); err != nil {
		// 提交失败,执行回滚
		c.rollback()
		return err
	}

	return nil
}

// prepare 准备阶段
func (c *Coordinator) prepare() error {
	for _, p := range c.participants {
		if err := p.Prepare(); err != nil {
			return err
		}
	}
	return nil
}

// commit 提交阶段
func (c *Coordinator) commit() error {
	for _, p := range c.participants {
		if err := p.Commit(); err != nil {
			return err
		}
	}
	return nil
}

// rollback 回滚阶段
func (c *Coordinator) rollback() error {
	var wg sync.WaitGroup
	var errMutex sync.Mutex
	var rollbackErr error

	for _, p := range c.participants {
		wg.Add(1)
		go func(participant Participant) {
			defer wg.Done()
			if err := participant.Rollback(); err != nil {
				errMutex.Lock()
				if rollbackErr == nil {
					rollbackErr = err
				}
				errMutex.Unlock()
			}
		}(p)
	}

	wg.Wait()
	return rollbackErr
}

// 示例参与者实现
type ExampleParticipant struct {
	name string
	prepared bool
}

func NewExampleParticipant(name string) *ExampleParticipant {
	return &ExampleParticipant{
		name: name,
	}
}

func (p *ExampleParticipant) Prepare() error {
	fmt.Printf("Participant %s: preparing\n", p.name)
	// 模拟准备操作
	p.prepared = true
	return nil
}

func (p *ExampleParticipant) Commit() error {
	fmt.Printf("Participant %s: committing\n", p.name)
	// 模拟提交操作
	return nil
}

func (p *ExampleParticipant) Rollback() error {
	fmt.Printf("Participant %s: rolling back\n", p.name)
	// 模拟回滚操作
	p.prepared = false
	return nil
}

func main() {
	// 创建参与者
	p1 := NewExampleParticipant("Service A")
	p2 := NewExampleParticipant("Service B")
	p3 := NewExampleParticipant("Service C")

	// 创建协调者
	coordinator := NewCoordinator([]Participant{p1, p2, p3})

	// 执行事务
	if err := coordinator.Execute(); err != nil {
		fmt.Printf("Transaction failed: %v\n", err)
	} else {
		fmt.Println("Transaction succeeded")
	}
}

4. TCC模式的Go语言实现

基本架构

package main

import (
	"errors"
	"fmt"
)

// TCCService TCC服务接口
type TCCService interface {
	Try() error
	Confirm() error
	Cancel() error
}

// TCCManager TCC事务管理器
type TCCManager struct {
	services []TCCService
}

// NewTCCManager 创建新的TCC管理器
func NewTCCManager(services []TCCService) *TCCManager {
	return &TCCManager{
		services: services,
	}
}

// Execute 执行TCC事务
func (m *TCCManager) Execute() error {
	// 执行Try阶段
	if err := m.try(); err != nil {
		// Try失败,执行Cancel
		m.cancel()
		return err
	}

	// 执行Confirm阶段
	if err := m.confirm(); err != nil {
		// Confirm失败,执行Cancel
		m.cancel()
		return err
	}

	return nil
}

// try 执行Try阶段
func (m *TCCManager) try() error {
	for _, service := range m.services {
		if err := service.Try(); err != nil {
			return err
		}
	}
	return nil
}

// confirm 执行Confirm阶段
func (m *TCCManager) confirm() error {
	for _, service := range m.services {
		if err := service.Confirm(); err != nil {
			return err
		}
	}
	return nil
}

// cancel 执行Cancel阶段
func (m *TCCManager) cancel() error {
	for _, service := range m.services {
		if err := service.Cancel(); err != nil {
			// 记录错误但继续执行
			fmt.Printf("Cancel failed for service: %v\n", err)
		}
	}
	return nil
}

// 示例TCC服务实现
type OrderService struct {
	orderID string
	reserved bool
}

func NewOrderService(orderID string) *OrderService {
	return &OrderService{
		orderID: orderID,
	}
}

func (s *OrderService) Try() error {
	fmt.Printf("OrderService: trying to reserve order %s\n", s.orderID)
	// 模拟预留资源
	s.reserved = true
	return nil
}

func (s *OrderService) Confirm() error {
	fmt.Printf("OrderService: confirming order %s\n", s.orderID)
	// 模拟确认操作
	return nil
}

func (s *OrderService) Cancel() error {
	fmt.Printf("OrderService: cancelling order %s\n", s.orderID)
	// 模拟取消操作
	s.reserved = false
	return nil
}

type InventoryService struct {
	productID string
	quantity  int
	reserved  int
}

func NewInventoryService(productID string, quantity int) *InventoryService {
	return &InventoryService{
		productID: productID,
		quantity:  quantity,
	}
}

func (s *InventoryService) Try() error {
	reserveQuantity := 1
	if s.quantity < reserveQuantity {
		return errors.New("insufficient inventory")
	}
	fmt.Printf("InventoryService: reserving %d units of product %s\n", reserveQuantity, s.productID)
	s.reserved = reserveQuantity
	s.quantity -= reserveQuantity
	return nil
}

func (s *InventoryService) Confirm() error {
	fmt.Printf("InventoryService: confirming reservation for product %s\n", s.productID)
	// 模拟确认操作
	return nil
}

func (s *InventoryService) Cancel() error {
	fmt.Printf("InventoryService: cancelling reservation for product %s\n", s.productID)
	// 模拟取消操作
	s.quantity += s.reserved
	s.reserved = 0
	return nil
}

type PaymentService struct {
	userID   string
	amount   float64
	reserved bool
}

func NewPaymentService(userID string, amount float64) *PaymentService {
	return &PaymentService{
		userID: userID,
		amount: amount,
	}
}

func (s *PaymentService) Try() error {
	fmt.Printf("PaymentService: reserving $%.2f for user %s\n", s.amount, s.userID)
	// 模拟预留资金
	s.reserved = true
	return nil
}

func (s *PaymentService) Confirm() error {
	fmt.Printf("PaymentService: confirming payment of $%.2f for user %s\n", s.amount, s.userID)
	// 模拟确认支付
	return nil
}

func (s *PaymentService) Cancel() error {
	fmt.Printf("PaymentService: cancelling payment for user %s\n", s.userID)
	// 模拟取消支付
	s.reserved = false
	return nil
}

func main() {
	// 创建TCC服务
	orderService := NewOrderService("order-123")
	inventoryService := NewInventoryService("product-456", 10)
	paymentService := NewPaymentService("user-789", 100.0)

	// 创建TCC管理器
	manager := NewTCCManager([]TCCService{orderService, inventoryService, paymentService})

	// 执行TCC事务
	if err := manager.Execute(); err != nil {
		fmt.Printf("TCC transaction failed: %v\n", err)
	} else {
		fmt.Println("TCC transaction succeeded")
	}
}

5. Saga模式的Go语言实现

基本架构

package main

import (
	"errors"
	"fmt"
)

// SagaStep Saga步骤
type SagaStep struct {
	Execute   func() error
	Compensate func() error
}

// Saga Saga事务
type Saga struct {
	steps []SagaStep
}

// NewSaga 创建新的Saga
func NewSaga() *Saga {
	return &Saga{
		steps: make([]SagaStep, 0),
	}
}

// AddStep 添加步骤
func (s *Saga) AddStep(execute, compensate func() error) {
	s.steps = append(s.steps, SagaStep{
		Execute:   execute,
		Compensate: compensate,
	})
}

// Execute 执行Saga
func (s *Saga) Execute() error {
	// 执行步骤
	for i, step := range s.steps {
		if err := step.Execute(); err != nil {
			// 执行失败,补偿已执行的步骤
			s.compensate(i)
			return err
		}
	}
	return nil
}

// compensate 补偿已执行的步骤
func (s *Saga) compensate(upToIndex int) {
	// 从后往前补偿
	for i := upToIndex; i >= 0; i-- {
		if err := s.steps[i].Compensate(); err != nil {
			// 记录补偿失败
			fmt.Printf("Compensation failed for step %d: %v\n", i, err)
		}
	}
}

func main() {
	// 创建Saga
	saga := NewSaga()

	// 添加步骤1:创建订单
	saga.AddStep(
		func() error {
			fmt.Println("Step 1: Creating order")
			// 模拟创建订单
			return nil
		},
		func() error {
			fmt.Println("Compensating Step 1: Cancelling order")
			// 模拟取消订单
			return nil
		},
	)

	// 添加步骤2:扣减库存
	saga.AddStep(
		func() error {
			fmt.Println("Step 2: Deducting inventory")
			// 模拟扣减库存
			return nil
		},
		func() error {
			fmt.Println("Compensating Step 2: Restoring inventory")
			// 模拟恢复库存
			return nil
		},
	)

	// 添加步骤3:处理支付
	saga.AddStep(
		func() error {
			fmt.Println("Step 3: Processing payment")
			// 模拟支付失败
			return errors.New("payment failed")
		},
		func() error {
			fmt.Println("Compensating Step 3: Refunding payment")
			// 模拟退款
			return nil
		},
	)

	// 执行Saga
	if err := saga.Execute(); err != nil {
		fmt.Printf("Saga failed: %v\n", err)
	} else {
		fmt.Println("Saga succeeded")
	}
}

6. 本地消息表模式的Go语言实现

基本架构

package main

import (
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"time"

	_ "github.com/go-sql-driver/mysql"
)

// Message 消息结构
type Message struct {
	ID        int       `json:"id"`
	Type      string    `json:"type"`
	Data      string    `json:"data"`
	Status    string    `json:"status"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

// Order 订单结构
type Order struct {
	ID     int     `json:"id"`
	UserID int     `json:"user_id"`
	Amount float64 `json:"amount"`
	Status string  `json:"status"`
}

// createTables 创建表结构
func createTables(db *sql.DB) error {
	// 创建订单表
	_, err := db.Exec(`
	CREATE TABLE IF NOT EXISTS orders (
		id INT AUTO_INCREMENT PRIMARY KEY,
		user_id INT NOT NULL,
		amount DECIMAL(10,2) NOT NULL,
		status VARCHAR(20) NOT NULL,
		created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
		updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
	)
	`)
	if err != nil {
		return err
	}

	// 创建消息表
	_, err = db.Exec(`
	CREATE TABLE IF NOT EXISTS messages (
		id INT AUTO_INCREMENT PRIMARY KEY,
		type VARCHAR(50) NOT NULL,
		data TEXT NOT NULL,
		status VARCHAR(20) NOT NULL DEFAULT 'pending',
		created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
		updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
	)
	`)
	return err
}

// createOrder 创建订单并发送消息
func createOrder(db *sql.DB, userID int, amount float64) error {
	// 开始事务
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	// 创建订单
	result, err := tx.Exec(
		"INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)",
		userID, amount, "created",
	)
	if err != nil {
		tx.Rollback()
		return err
	}

	// 获取订单ID
	orderID, err := result.LastInsertId()
	if err != nil {
		tx.Rollback()
		return err
	}

	// 创建订单消息
	order := Order{
		ID:     int(orderID),
		UserID: userID,
		Amount: amount,
		Status: "created",
	}

	orderJSON, err := json.Marshal(order)
	if err != nil {
		tx.Rollback()
		return err
	}

	// 插入消息
	_, err = tx.Exec(
		"INSERT INTO messages (type, data, status) VALUES (?, ?, ?)",
		"order_created", string(orderJSON), "pending",
	)
	if err != nil {
		tx.Rollback()
		return err
	}

	// 提交事务
	return tx.Commit()
}

// processMessages 处理待处理的消息
func processMessages(db *sql.DB) error {
	// 查询待处理的消息
	rows, err := db.Query(
		"SELECT id, type, data, status FROM messages WHERE status = 'pending' LIMIT 10",
	)
	if err != nil {
		return err
	}
	defer rows.Close()

	for rows.Next() {
		var msg Message
		err := rows.Scan(&msg.ID, &msg.Type, &msg.Data, &msg.Status)
		if err != nil {
			return err
		}

		// 处理消息
		err = processMessage(&msg)
		if err != nil {
			// 标记消息为失败
			_, updateErr := db.Exec(
				"UPDATE messages SET status = 'failed' WHERE id = ?",
				msg.ID,
			)
			if updateErr != nil {
				log.Printf("Failed to update message status: %v", updateErr)
			}
			continue
		}

		// 标记消息为成功
		_, err = db.Exec(
			"UPDATE messages SET status = 'success' WHERE id = ?",
			msg.ID,
		)
		if err != nil {
			log.Printf("Failed to update message status: %v", err)
		}
	}

	return rows.Err()
}

// processMessage 处理单个消息
func processMessage(msg *Message) error {
	fmt.Printf("Processing message %d: %s\n", msg.ID, msg.Type)

	// 根据消息类型处理
	switch msg.Type {
	case "order_created":
		// 处理订单创建消息
		var order Order
		if err := json.Unmarshal([]byte(msg.Data), &order); err != nil {
			return err
		}

		// 模拟处理订单,如通知库存服务、支付服务等
		fmt.Printf("Processing order %d for user %d, amount $%.2f\n", order.ID, order.UserID, order.Amount)

		// 模拟网络延迟
		time.Sleep(1 * time.Second)

		return nil

	default:
		return fmt.Errorf("unknown message type: %s", msg.Type)
	}
}

func main() {
	// 连接数据库
	db, err := sql.Open("mysql", "root:password@tcp(localhost:3306)/test")
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}
	defer db.Close()

	// 创建表结构
	if err := createTables(db); err != nil {
		log.Fatalf("Failed to create tables: %v", err)
	}

	// 创建订单
	if err := createOrder(db, 1, 100.0); err != nil {
		log.Fatalf("Failed to create order: %v", err)
	}

	// 处理消息
	if err := processMessages(db); err != nil {
		log.Fatalf("Failed to process messages: %v", err)
	}

	fmt.Println("Order created and message processed successfully")
}

7. 基于消息队列的最终一致性实现

使用RabbitMQ实现

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/streadway/amqp"
)

// Order 订单结构
type Order struct {
	ID     int     `json:"id"`
	UserID int     `json:"user_id"`
	Amount float64 `json:"amount"`
	Status string  `json:"status"`
}

// connectRabbitMQ 连接到RabbitMQ
func connectRabbitMQ() (*amqp.Connection, *amqp.Channel, error) {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		return nil, nil, err
	}

	ch, err := conn.Channel()
	if err != nil {
		conn.Close()
		return nil, nil, err
	}

	// 声明交换机
	err = ch.ExchangeDeclare(
		"orders",  // 交换机名称
		"fanout", // 交换机类型
		true,      // 持久化
		false,     // 自动删除
		false,     // 内部
		false,     // 不等待
		nil,       // 额外参数
	)
	if err != nil {
		ch.Close()
		conn.Close()
		return nil, nil, err
	}

	return conn, ch, nil
}

// sendOrderMessage 发送订单消息
func sendOrderMessage(ch *amqp.Channel, order Order) error {
	// 序列化订单
	orderJSON, err := json.Marshal(order)
	if err != nil {
		return err
	}

	// 发送消息
	err = ch.Publish(
		"orders", // 交换机
		"",       // 路由键
		false,    // 强制
		false,    // 立即
		amqp.Publishing{
			ContentType: "application/json",
			Body:        orderJSON,
		},
	)

	return err
}

// consumeOrderMessages 消费订单消息
func consumeOrderMessages(ch *amqp.Channel) error {
	// 声明队列
	q, err := ch.QueueDeclare(
		"",    // 队列名称(自动生成)
		false, // 持久化
		true,  // 自动删除
		true,  // 排他性
		false, // 不等待
		nil,   // 额外参数
	)
	if err != nil {
		return err
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name,  // 队列名称
		"",      // 路由键
		"orders", // 交换机名称
		false,   // 不等待
		nil,     // 额外参数
	)
	if err != nil {
		return err
	}

	// 消费消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他性
		false,  // 不本地
		false,  // 不等待
		nil,    // 额外参数
	)
	if err != nil {
		return err
	}

	// 处理消息
	go func() {
		for d := range msgs {
			var order Order
			if err := json.Unmarshal(d.Body, &order); err != nil {
				log.Printf("Failed to unmarshal order: %v", err)
				continue
			}

			fmt.Printf("Received order: %+v\n", order)

			// 处理订单,如更新库存、处理支付等
			processOrder(order)
		}
	}()

	return nil
}

// processOrder 处理订单
func processOrder(order Order) {
	fmt.Printf("Processing order %d...\n", order.ID)

	// 模拟处理时间
	time.Sleep(2 * time.Second)

	// 模拟处理结果
	fmt.Printf("Order %d processed successfully\n", order.ID)
}

func main() {
	// 连接到RabbitMQ
	conn, ch, err := connectRabbitMQ()
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()
	defer ch.Close()

	// 启动消费者
	if err := consumeOrderMessages(ch); err != nil {
		log.Fatalf("Failed to start consumer: %v", err)
	}

	// 创建订单
	order := Order{
		ID:     1,
		UserID: 123,
		Amount: 99.99,
		Status: "created",
	}

	// 发送订单消息
	if err := sendOrderMessage(ch, order); err != nil {
		log.Fatalf("Failed to send order message: %v", err)
	}

	fmt.Println("Order message sent successfully")

	// 等待消费者处理
	time.Sleep(5 * time.Second)
}

8. 实际应用案例

电商订单处理

场景:用户下单后,需要创建订单、扣减库存、处理支付等操作,这些操作分布在不同的服务中。

实现:使用Saga模式处理订单流程

package main

import (
	"errors"
	"fmt"
)

// OrderService 订单服务
type OrderService struct {
	orders map[int]string
}

func NewOrderService() *OrderService {
	return &OrderService{
		orders: make(map[int]string),
	}
}

func (s *OrderService) CreateOrder(orderID int) error {
	fmt.Printf("Creating order %d\n", orderID)
	s.orders[orderID] = "created"
	return nil
}

func (s *OrderService) CancelOrder(orderID int) error {
	fmt.Printf("Cancelling order %d\n", orderID)
	s.orders[orderID] = "cancelled"
	return nil
}

// InventoryService 库存服务
type InventoryService struct {
	inventory map[string]int
}

func NewInventoryService() *InventoryService {
	return &InventoryService{
		inventory: map[string]int{
			"product1": 10,
			"product2": 5,
		},
	}
}

func (s *InventoryService) DeductInventory(productID string, quantity int) error {
	fmt.Printf("Deducting %d units of %s\n", quantity, productID)
	if s.inventory[productID] < quantity {
		return errors.New("insufficient inventory")
	}
	s.inventory[productID] -= quantity
	return nil
}

func (s *InventoryService) RestoreInventory(productID string, quantity int) error {
	fmt.Printf("Restoring %d units of %s\n", quantity, productID)
	s.inventory[productID] += quantity
	return nil
}

// PaymentService 支付服务
type PaymentService struct {
	userBalances map[int]float64
}

func NewPaymentService() *PaymentService {
	return &PaymentService{
		userBalances: map[int]float64{
			1: 1000.0,
			2: 500.0,
		},
	}
}

func (s *PaymentService) ProcessPayment(userID int, amount float64) error {
	fmt.Printf("Processing payment of $%.2f for user %d\n", amount, userID)
	if s.userBalances[userID] < amount {
		return errors.New("insufficient balance")
	}
	s.userBalances[userID] -= amount
	return nil
}

func (s *PaymentService) RefundPayment(userID int, amount float64) error {
	fmt.Printf("Refunding $%.2f to user %d\n", amount, userID)
	s.userBalances[userID] += amount
	return nil
}

// ShippingService 物流服务
type ShippingService struct {
	shipments map[int]string
}

func NewShippingService() *ShippingService {
	return &ShippingService{
		shipments: make(map[int]string),
	}
}

func (s *ShippingService) CreateShipment(orderID int) error {
	fmt.Printf("Creating shipment for order %d\n", orderID)
	s.shipments[orderID] = "created"
	return nil
}

func (s *ShippingService) CancelShipment(orderID int) error {
	fmt.Printf("Cancelling shipment for order %d\n", orderID)
	s.shipments[orderID] = "cancelled"
	return nil
}

func main() {
	// 初始化服务
	orderService := NewOrderService()
	inventoryService := NewInventoryService()
	paymentService := NewPaymentService()
	shippingService := NewShippingService()

	// 订单信息
	orderID := 1
	userID := 1
	productID := "product1"
	quantity := 2
	amount := 199.98

	// 创建Saga
	saga := NewSaga()

	// 步骤1:创建订单
	saga.AddStep(
		func() error {
			return orderService.CreateOrder(orderID)
		},
		func() error {
			return orderService.CancelOrder(orderID)
		},
	)

	// 步骤2:扣减库存
	saga.AddStep(
		func() error {
			return inventoryService.DeductInventory(productID, quantity)
		},
		func() error {
			return inventoryService.RestoreInventory(productID, quantity)
		},
	)

	// 步骤3:处理支付
	saga.AddStep(
		func() error {
			return paymentService.ProcessPayment(userID, amount)
		},
		func() error {
			return paymentService.RefundPayment(userID, amount)
		},
	)

	// 步骤4:创建物流
	saga.AddStep(
		func() error {
			return shippingService.CreateShipment(orderID)
		},
		func() error {
			return shippingService.CancelShipment(orderID)
		},
	)

	// 执行Saga
	if err := saga.Execute(); err != nil {
		fmt.Printf("Order processing failed: %v\n", err)
	} else {
		fmt.Println("Order processed successfully")
	}

	// 打印最终状态
	fmt.Printf("Order status: %s\n", orderService.orders[orderID])
	fmt.Printf("Inventory of %s: %d\n", productID, inventoryService.inventory[productID])
	fmt.Printf("User %d balance: $%.2f\n", userID, paymentService.userBalances[userID])
	fmt.Printf("Shipment status: %s\n", shippingService.shipments[orderID])
}

9. 性能优化和最佳实践

性能优化

  1. 减少网络开销

    • 批量处理请求
    • 使用异步通信
    • 优化序列化和反序列化
  2. 提高并发性能

    • 使用goroutine处理并行任务
    • 使用channel进行通信
    • 避免不必要的锁竞争
  3. 优化数据库操作

    • 使用数据库连接池
    • 批量提交事务
    • 优化SQL查询
  4. 缓存策略

    • 使用缓存减少数据库访问
    • 合理设置缓存过期时间
    • 避免缓存穿透和雪崩

最佳实践

  1. 选择合适的分布式事务方案

    • 根据业务场景选择合适的方案
    • 考虑一致性要求和性能需求
    • 评估实现复杂度和维护成本
  2. 错误处理和重试机制

    • 实现幂等性操作
    • 使用指数退避重试策略
    • 监控和告警异常情况
  3. 监控和可观测性

    • 监控事务执行状态
    • 跟踪事务执行时间
    • 记录详细的日志
  4. 测试和演练

    • 测试各种异常场景
    • 演练故障恢复流程
    • 定期进行压力测试

10. 代码优化建议

1. 错误处理优化

原始代码

func (c *Coordinator) prepare() error {
	for _, p := range c.participants {
		if err := p.Prepare(); err != nil {
			return err
		}
	}
	return nil
}

优化建议

func (c *Coordinator) prepare() error {
	for i, p := range c.participants {
		if err := p.Prepare(); err != nil {
			return fmt.Errorf("participant %d prepare failed: %w", i, err)
		}
	}
	return nil
}

2. 并发处理优化

原始代码

func (c *Coordinator) rollback() error {
	var wg sync.WaitGroup
	var errMutex sync.Mutex
	var rollbackErr error

	for _, p := range c.participants {
		wg.Add(1)
		go func(participant Participant) {
			defer wg.Done()
			if err := participant.Rollback(); err != nil {
				errMutex.Lock()
				if rollbackErr == nil {
					rollbackErr = err
				}
				errMutex.Unlock()
			}
		}(p)
	}

	wg.Wait()
	return rollbackErr
}

优化建议

func (c *Coordinator) rollback() error {
	var wg sync.WaitGroup
	errorsChan := make(chan error, len(c.participants))

	for _, p := range c.participants {
		wg.Add(1)
		go func(participant Participant) {
			defer wg.Done()
			if err := participant.Rollback(); err != nil {
				errorsChan <- err
			}
		}(p)
	}

	go func() {
		wg.Wait()
		close(errorsChan)
	}()

	var rollbackErr error
	for err := range errorsChan {
		if rollbackErr == nil {
			rollbackErr = err
		}
		log.Printf("Rollback error: %v", err)
	}

	return rollbackErr
}

3. 代码结构优化

原始代码

// TCCService TCC服务接口
type TCCService interface {
	Try() error
	Confirm() error
	Cancel() error
}

优化建议

// TCCService TCC服务接口
type TCCService interface {
	// Try 尝试执行业务操作,预留资源
	Try() error
	// Confirm 确认执行业务操作,使用预留的资源
	Confirm() error
	// Cancel 取消执行业务操作,释放预留的资源
	Cancel() error
}

// BaseTCCService 基础TCC服务实现
type BaseTCCService struct {
	ID string
}

// NewBaseTCCService 创建基础TCC服务
func NewBaseTCCService(id string) *BaseTCCService {
	return &BaseTCCService{ID: id}
}

// Try 默认实现
func (s *BaseTCCService) Try() error {
	return nil
}

// Confirm 默认实现
func (s *BaseTCCService) Confirm() error {
	return nil
}

// Cancel 默认实现
func (s *BaseTCCService) Cancel() error {
	return nil
}

11. 总结

分布式事务是分布式系统中的一个复杂问题,没有一种放之四海而皆准的解决方案。选择合适的分布式事务方案需要考虑业务场景、一致性要求、性能需求等多种因素。

通过本文的学习,你应该掌握了:

  1. 分布式事务的基本概念和挑战
  2. 常见的分布式事务解决方案(2PC、TCC、Saga、本地消息表等)
  3. Go语言中实现各种分布式事务方案的方法
  4. 实际应用案例和代码示例
  5. 性能优化和最佳实践

在实际项目中,你需要根据具体的业务场景选择合适的分布式事务方案:

通过合理使用分布式事务,可以确保分布式系统的数据一致性,提高系统的可靠性和可用性。

到此这篇关于Go语言的分布式事务处理的文章就介绍到这了,更多相关Go语言 分布式事务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文