java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java cqrs架构

在Java中实现CQRS架构的全过程

作者:中华人民共和国程序员

CQRS是Command Query Responsibility Segregation的缩写,一般称作命令查询职责分离,本文给大家介绍在Java中实现CQRS架构的全过程,感兴趣的朋友跟随小编一起看看吧

使用中介者模式轻松实现命令查询职责分离,构建高内聚、低耦合的应用系统

一、知识点回顾

1. 什么是CQRS?

CQRS是Command Query Responsibility Segregation的缩写,一般称作命令查询职责分离。从字面意思理解,就是将命令(写入)和查询(读取)的责任划分到不同的模型中。

对比一下常用的 CRUD 模式(创建-读取-更新-删除),通常我们会让用户界面与负责所有四种操作的数据存储交互。而 CQRS 则将这些操作分成两种模式,一种用于查询(又称 "R"),另一种用于命令(又称 "CUD")。

2. CQRS的作用是什么?

CQRS将系统的写操作(命令)和读操作(查询)分离到不同的模型和数据存储中,从而实现读写分离,提高系统的性能、可扩展性和安全性,并使复杂业务逻辑(写端)和高效查询(读端)各自得到优化,降低系统复杂性。它允许为写操作设计严谨的领域模型,为读操作设计简单、只关注查询效率的数据模型(如专用视图或报表数据库),并可通过事件等机制保持最终一致性。

3. CQRS 的优点

二、关于PipelinR

项目地址

https://github.com/sizovs/PipelinR

项目开发者在Github的介绍不多,关键是最后一句话:It's similar to a popular MediatR .NET library. 意思就是这个项目是参考着一个叫MediatR的.net库写的。关于MediatR我之前有两篇文章专门介绍过。

PipelinR(包括MediatR)提供了一种CQRS的实现方式,基于中介者模式实现进程内消息传递,用于解耦应用中的各个组件,支持请求/响应(一对一,有返回值)和发布/订阅(一对多,无返回值)两种消息模式。它们在内部提供管道行为 (Pipeline Behaviors),用于在消息处理前后插入自定义逻辑,如日志、验证、异常处理等。

需要提醒的是,PipelinR并不是一个完整的CQRS框架,它只是一个中介者模式的具体实现方式,将调用方和处理方进行了解耦,而这种模式恰好可以用来在一个单体应用(或者是微服务的服务内部)中实现简单的CQRS。

三、依赖安装和配置

1. Maven安装

<dependency>
  <groupId>net.sizovs</groupId>
  <artifactId>pipelinr</artifactId>
  <version>0.11</version>
</dependency>

2. Gradle安装

dependencies {
    compile 'net.sizovs:pipelinr:0.11'
}

在Spring项目中配置PipelinR

@Configuration
public class PipelinrConfiguration {
    @Bean
    Pipeline pipeline(ObjectProvider<Command.Handler> commandHandlers, ObjectProvider<Notification.Handler> notificationHandlers, ObjectProvider<Command.Middleware> middlewares) {
        return new Pipelinr()
          .with(commandHandlers::stream)
          .with(notificationHandlers::stream)
          .with(middlewares::orderedStream);
    }
}

四、核心组件

五、请求/响应模式实现

请求/响应模式需要用到Command接口。

1. 定义Command

Command代表一个请求,需要实现net.sizovs.pipelinr.Command接口。泛型参数指定返回值类型。

// 定义一个创建用户的命令
public class CreateUserCommand implements Command<UserResponse> {
    private String username;
    private String email;
    public CreateUserCommand(String username, String email) {
        this.username = username;
        this.email = email;
    }
    public String getUsername() {
        return username;
    }
    public String getEmail() {
        return email;
    }
}
// 返回值类型
public class UserResponse {
    private Long userId;
    private String username;
    private String email;
    public UserResponse(Long userId, String username, String email) {
        this.userId = userId;
        this.username = username;
        this.email = email;
    }
    // getters
}

2. 定义Command Handler

创建该Command对应的处理器,实现net.sizovs.pipelinr.Command.Handler接口。

@Component
public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
    @Autowired
    private UserRepository userRepository;
    @Override
    public UserResponse handle(CreateUserCommand command) {
        // 业务逻辑处理
        User user = new User();
        user.setUsername(command.getUsername());
        user.setEmail(command.getEmail());
        User savedUser = userRepository.save(user);
        return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
    }
}

3. 在业务代码中使用

通过注入Pipeline实例,发送Command并获取响应。

@Service
public class UserService {
    @Autowired
    private Pipeline pipeline;
    public UserResponse createUser(String username, String email) {
        CreateUserCommand command = new CreateUserCommand(username, email);
        UserResponse response = pipeline.send(command);
        return response;
    }
}

4. 添加Command中间件

中间件可以在Command处理前后执行一些操作,如验证、日志、事务管理等。

@Component
public class LoggingMiddleware implements Command.Middleware {
    private static final Logger logger = LoggerFactory.getLogger(LoggingMiddleware.class);
    @Override
    public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
        logger.info("Executing command: {}", command.getClass().getSimpleName());
        try {
            R result = chain.proceed(command);
            logger.info("Command executed successfully");
            return result;
        } catch (Exception e) {
            logger.error("Command execution failed", e);
            throw e;
        }
    }
}
@Component
public class ValidationMiddleware implements Command.Middleware {
    @Autowired
    private Validator validator;
    @Override
    public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
        Set<ConstraintViolation<C>> violations = validator.validate(command);
        if (!violations.isEmpty()) {
            throw new ConstraintViolationException("Validation failed", violations);
        }
        return chain.proceed(command);
    }
}
@Component
@Order(1) // 指定中间件执行顺序
public class TransactionMiddleware implements Command.Middleware {
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Override
    public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
        TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
        try {
            R result = chain.proceed(command);
            transactionManager.commit(status);
            return result;
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
}

六、发布/订阅模式实现

发布/订阅模式使用Notification接口,用于一对多的消息分发,没有返回值。

1. 定义Notification

Notification代表一个事件通知,需要实现net.sizovs.pipelinr.Notification接口。

// 定义一个用户创建成功的事件通知
public class UserCreatedNotification implements Notification {
    private Long userId;
    private String username;
    private String email;
    private LocalDateTime createdTime;
    public UserCreatedNotification(Long userId, String username, String email) {
        this.userId = userId;
        this.username = username;
        this.email = email;
        this.createdTime = LocalDateTime.now();
    }
    // getters
}

2. 定义Notification Handler

Notification可以有多个处理器,每个处理器实现net.sizovs.pipelinr.Notification.Handler接口。

@Component
public class SendWelcomeEmailHandler implements Notification.Handler<UserCreatedNotification> {
    private static final Logger logger = LoggerFactory.getLogger(SendWelcomeEmailHandler.class);
    @Autowired
    private EmailService emailService;
    @Override
    public void handle(UserCreatedNotification notification) {
        logger.info("Sending welcome email to user: {}", notification.getUsername());
        emailService.sendWelcomeEmail(notification.getEmail(), notification.getUsername());
    }
}
@Component
public class LogUserCreationHandler implements Notification.Handler<UserCreatedNotification> {
    private static final Logger logger = LoggerFactory.getLogger(LogUserCreationHandler.class);
    @Autowired
    private UserAuditLogRepository auditLogRepository;
    @Override
    public void handle(UserCreatedNotification notification) {
        logger.info("Logging user creation: {}", notification.getUsername());
        UserAuditLog auditLog = new UserAuditLog();
        auditLog.setUserId(notification.getUserId());
        auditLog.setOperation("CREATE");
        auditLog.setTimestamp(notification.getCreatedTime());
        auditLogRepository.save(auditLog);
    }
}
@Component
public class UpdateUserStatisticsHandler implements Notification.Handler<UserCreatedNotification> {
    private static final Logger logger = LoggerFactory.getLogger(UpdateUserStatisticsHandler.class);
    @Autowired
    private UserStatisticsRepository statisticsRepository;
    @Override
    public void handle(UserCreatedNotification notification) {
        logger.info("Updating statistics for new user: {}", notification.getUsername());
        UserStatistics stats = statisticsRepository.findOrCreate();
        stats.incrementTotalUsers();
        statisticsRepository.save(stats);
    }
}

3. 发送Notification

在Command处理完成后,可以发送Notification通知所有相关的处理器。

@Component
public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
    @Autowired
    private UserRepository userRepository;
    @Autowired
    private Pipeline pipeline;
    @Override
    public UserResponse handle(CreateUserCommand command) {
        // 业务逻辑处理
        User user = new User();
        user.setUsername(command.getUsername());
        user.setEmail(command.getEmail());
        User savedUser = userRepository.save(user);
        // 发送事件通知
        UserCreatedNotification notification = new UserCreatedNotification(
            savedUser.getId(), 
            savedUser.getUsername(), 
            savedUser.getEmail()
        );
        pipeline.send(notification);
        return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
    }
}

4. 添加Notification中间件

类似Command,Notification也支持中间件。

@Component
public class NotificationLoggingMiddleware implements Notification.Middleware {
    private static final Logger logger = LoggerFactory.getLogger(NotificationLoggingMiddleware.class);
    @Override
    public <N extends Notification> void invoke(N notification, Chain chain) {
        logger.info("Publishing notification: {}", notification.getClass().getSimpleName());
        try {
            chain.proceed(notification);
            logger.info("Notification published successfully");
        } catch (Exception e) {
            logger.error("Notification publishing failed", e);
            throw e;
        }
    }
}
@Component
public class NotificationErrorHandlingMiddleware implements Notification.Middleware {
    private static final Logger logger = LoggerFactory.getLogger(NotificationErrorHandlingMiddleware.class);
    @Override
    public <N extends Notification> void invoke(N notification, Chain chain) {
        try {
            chain.proceed(notification);
        } catch (Exception e) {
            logger.error("Error handling notification: {}", notification.getClass().getSimpleName(), e);
            // 可以选择吞掉异常或重新抛出,取决于业务需求
            // throw e;
        }
    }
}

七、总结

核心收获

通过本文的介绍,我们了解了如何在Java应用中使用PipelinR框架实现CQRS模式。核心要点总结如下:

1. CQRS的价值

2. PipelinR的核心特性

3. 最佳实践建议

实施建议

适用场景

注意事项

结论

PipelinR提供了一种轻量级、简洁的CQRS实现方案。它特别适合那些想要在不过度复杂化系统的前提下,引入DDD思想和事件驱动设计的项目。通过合理运用Command和Notification,结合恰当的中间件设计,开发者可以构建出高内聚、低耦合、易于维护和扩展的应用系统。

关键是要把握好"度"——既要充分发挥CQRS和PipelinR的优势,又要避免为了追求"高大上"的架构而过度设计,最终的目标是为业务的快速迭代和长期维护提供支撑。

到此这篇关于在Java中实现CQRS架构的全过程的文章就介绍到这了,更多相关java cqrs架构内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文