java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > redis 服务间通信

redis做服务间通信工具的项目示例

作者:在下uptown

Redis是一种高效的服务间通信工具,它以键值对的形式存储数据,并支持多种数据类型和丰富的操作,本文主要介绍了redis做服务间通信工具的项目示例,感兴趣的可以了解一下

前言

先说一下为什么要有这个东西,用消息中间件的好处就不用说了,日常开发中还是有很多场景需要用到消息传递的,消息的topic如何管理,如何约束topic,重要的topic消费记录、历史消息等就是这个sdk需要做的。

本质上只是一层对消息中间件的封装。这次只是抛砖引玉只引入redis的三种消息类型,pubsub、queue以及stream。

扩展其他中间件按着代码思路一样。望各路大佬赐教

架构设计

一个消息服务sdk首先需要具备两个能力,即生产和消费,这两个功能离不开校验topic合法性,我们姑且简单点陪在mysql数据库中,但不可能每次校验topic是否合法都要去查询数据库,这里借鉴kafka存放topic信息的思想,找一个redis的key存放所有的topic列表。

定义一个核心service接口。

public interface MessageHubService {
    /**
     * 生产消息
     */
    void producer(MessageForm messageForm);
    /**
     * 消费消息
     */
    void consumer(ConsumerAdapterForm adapterForm);
    /**
     * 检查topic、type合法性
     */
    void checkTopic(String topic, String type);
}

方法入参统一使用MessageForm类,里面定义一些基础的信息,比如哪个消息topic,哪个消息类型等等。

@Data
public class MessageForm {
    // 消息组件类型
    private String type;
    // 消息主题
    private String topic;
    private String message = "";
    // 消费者组
    private String group = "UPTOWN";
}

自从之前文章中说的文件夹改造之后特别喜欢三层结构,即service、baseServiceImpl、customizeServiceImpl。

大体就是service定义接口参数、返回类型标准化接口,baseServiceImpl实现service基础接口实现,做一些统一的拦截处理,比如校验topic合法等操作,customizeServiceImpl属于具体实现类extends baseServiceImpl实现具体逻辑。

topic白名单通过Timer维护,定义一个Timer通过lua脚本隔一段时间刷新到redis中。

基础类baseServiceImpl实现

@Service
public class MessageHubServiceImpl implements MessageHubService, ApplicationContextAware {
    @Resource
    protected StringRedisTemplate stringRedisTemplate;
    public Map<String, MessageHubService> messageHubServiceMap = new ConcurrentHashMap<>();
    private ApplicationContext applicationContext;
    @PostConstruct
    public void init() {
        messageHubServiceMap.put(TopicTypeConstants.REDIS_PUBSUB_TYPE, applicationContext.getBean(RedisPubSubProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_STREAM_TYPE, applicationContext.getBean(RedisQueueProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_QUEUE_TYPE, applicationContext.getBean(RedisStreamProcessor.class));
    }
    public void checkTopic(String topic, String type) {
        if (!messageHubServiceMap.containsKey(type)) {
            throw new MatrixException("消息类型不支持");
        }
        List<String> whiteTopicList = stringRedisTemplate.opsForList().range(TopicTypeConstants.WHITE_TOPIC, 0, -1);
        if ((!ObjectUtils.isEmpty(whiteTopicList) && !whiteTopicList.contains(topic)) || ObjectUtils.isEmpty(whiteTopicList)) {
            throw new MatrixException("当前topic未配置");
        }
    }
    @Override
    public void producer(MessageForm messageForm) {
        this.checkTopic(messageForm.getTopic(), messageForm.getType());
        this.messageHubServiceMap.get(messageForm.getType()).producer(messageForm);
    }
    /**
     * 消费者创建通过注解,已校验topic合法性
     */
    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        this.messageHubServiceMap.get(messageForm.getType()).consumer(messageForm);
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

具体自定义实现类

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
    @Override
    public void producer(MessageForm messageForm) {
        // 具体生产逻辑
    }
    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        // 具体消费逻辑
    }
}

代码非常清晰了,整体满足service、baseServiceImpl、customizeServiceImpl三层结构。

生产者逻辑

生产者API做的比较简单,只是提供一个API调用,在调用前做一些校验工作,仅仅的是一条命令,不做发送失败的重试等操作。

消费者逻辑

消费者的话还是定义一个注解,还是通过借助SpringBoot生命周期扫描注解的方式在后台建立常驻线程的方式。

@Slf4j
@Component
public class ConsumerConfig implements DisposableBean, SmartInstantiationAwareBeanPostProcessor {
    @Resource(name = "messageHubServiceImpl")
    MessageHubService messageHubService;
    @Bean(name = "redisPubSubConsumerMap")
    public Map<String, MessageListenerAdapter> redisPubSubConsumerMap() {
        return new ConcurrentHashMap<>();
    }
    @Override
    public void destroy() throws Exception {
    }
    @Override
    public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException {
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
            if (annotation == null) {
                continue;
            }
            String resolveTopic = annotation.topic();
            try {
                messageHubService.checkTopic(resolveTopic, annotation.type());
            } catch (Exception e) {
                throw new Error(e.getMessage());
            }
            ConsumerAdapterForm adapterForm = new ConsumerAdapterForm();
            adapterForm.setBean(bean);
            adapterForm.setInvokeMethod(method);
            adapterForm.setTopic(resolveTopic);
            adapterForm.setType(annotation.type());
            adapterForm.setGroup(annotation.group());
            messageHubService.consumer(adapterForm);
        }
        return bean;
    }
}

这里依靠spring生命周期,拿到所有的bean,根据注解标注的方法去走不同的逻辑生成常驻线程,监听到消息之后回调到标注了注解的方法里。
具体的消费逻辑就不赘述了,感兴趣的可以看下源码:gitee.com/atuptown/up…
Topic守护线程

@Slf4j
@Service
public class TopicReloadTask extends TimerTask {
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @Resource
    EntityManager entityManager;
    public final String TOPIC_SQL = " select * from MESSAGEHUB_TOPIC ";
    public final String LUA_SCRIPT =
                "redis.call('del', 'MESSAGEHUB_TOPIC')" +
                "local topics = KEYS " +
                "for i, v in pairs(topics) do " +
                "  redis.call('lpush', 'MESSAGEHUB_TOPIC', v) " +
                "end";
    @Override
    public void run() {
        try {
            List<String> topics = this.getQueryResult(TOPIC_SQL, MessageHubTopicBean.class).stream().map(MessageHubTopicBean::getTopic).collect(Collectors.toList());
            if (!ObjectUtils.isEmpty(topics)) {
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
                Long result = stringRedisTemplate.execute(redisScript, topics);
                log.info("reload topic finish");
            }
        } catch (Throwable t) {
            log.error("messagehub topic reload error", t);
        }
    }
    private <T> List<T> getQueryResult(String sql, Class<T> clazz) {
        Query dataQuery = entityManager.createNativeQuery(sql, clazz);
        List<T> result = new ArrayList<>();
        List<Object> list = dataQuery.getResultList();
        for (Object o : list) {
            result.add((T) o);
        }
        return result;
    }
}

定义一个timer任务,隔一段时间将mysql中的topic白名单通过lua脚本的方式刷新到指定的reids topic key中。还有一些可以优化的地方,比如同步topic的操作只需要一个服务即可,所以可以使用@ConditionalOnProperty注解判断是否需要进行同步topic。

git地址:https://gitee.com/atuptown/uptown-messagehub

到此这篇关于redis做服务间通信工具的项目示例的文章就介绍到这了,更多相关redis 服务间通信 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文