java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java数据同步组件

基于Java实现一个简单的数据同步组件

作者:磊叔的技术博客

这篇文章主要为大家详细介绍了如何基于Java实现一个简单的数据同步组件,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以了解一下

目前关于数据同步的组件开源社区有很多,如:Flink CDC, DataX, seaTunel,Kattle 等等,大体上可以分为两种:基于日志的和基于 JDBC 的。这些同步组件在进行整库同步或者 schema 差异性不大的情况下通过可视化界面或者配置文件映射的方式可以直接达到我们库-库的同步诉求,但是对于一些定制化较大的场景处理起来还比较麻烦。为了满足这样的一个场景,笔者写了一个小的同步组件。

PS: 我们的业务场景比较特殊,源的种类比较多,有 Oracle、Mysql、文件以及 API 接口等。另外一点是需要同步的数据量不是很大,引入额外的数据同步组件对于我们来说会有额外的运维成本和学习成本,因此基于上述两个点,决定自己写一个小的组件。

业务框架

两个基本需求

组件模型

以 Mysql 为例,同步组件的主要模块如下:

项目模块划分如下:

核心问题

针对数据同步组件,解决的核心问题可以抽象成如下模型:A_DB -> A -> B -> B_DB;即将 A 数据库中的数据读取出来之后转换成 A class instance,然后将 A class instance 转换成 B class instance,再将 B class instance 写到 B 数据库。

解决 A_DB 到 A

从 A_DB -> A 或者 B -> B_DB 这个过程,就是我们所熟知的 ORM 解决的问题;不管是 hibernate、mybatis 还是 SpringBoot JPA 都是围绕着这个问题展开的。

在本篇的组件中,因没有引入 ORM,所以将数据库行映射成一个 java 对象也需要自己实现。DataX 中是通过配置文件来描述的,在本篇中没有才采用这种描述方式,而是通过语言耦合性更高的注解的方式来实现的(由业务属性决定);

如下是一个描述具体业务的 Java 对象的定义,@Table 注解用来描述 JmltModel 和哪个表是关联的, @Colum 注解用来描述属性是和哪个字段关联的.

@Data
// @Table 注解用来描述 JmltModel 和哪个表是关联 的
@Table(name = "user_info") 
public class JmltModel implements Serializable {
    // @Colum 注解用来描述属性是和哪个字段关联的
    @Colum(name = "id")
    private Long id;
    @Colum(name = "email")
    private String email;
    @Colum(name = "name")
    private String name;
    @Colum(name = "create_time")
    private Date create_time;
}

有了这个描述关系,即可以在 runtime 时通过泛型 + 反射来实现 A_DB -> A 过程的模板设计。

MysqlConnector 的实现来进行说明,下面抽取了 MysqlConnector 组件中的部分代码(做了一些删减);下面这段代码中有 1-6 6 的步骤,这部分属于生产端,即从原始 Mysql 表中分页读取数据,并将读取到的数据映射成实际的对象,再通过业务定义的 convertor 转换成目标的对象,最后丢到队列中去等待消费。

// 1、originClass 是原始库对象,这里通过反射获取 Table 注解,从而拿到表名
Table table = (Table) originClass.getDeclaredAnnotation(Table.class);
String tableName = table.name();
// 2、计算所有的条数,然后按照分页的方式进行 fetch
SqlTemplate sqlTemplate = new SqlTemplate(this.originDataSource);
int totalCount = sqlTemplate.count();
RowBounds rowBounds = new RowBounds(totalCount);
int totalPage = rowBounds.getTotalPage();
// 3、这里是按分页批量拉取
for (int i = 1; i <= totalPage; i++) {
    int offset = rowBounds.getOffset(i);
    String condition = " limit " + offset + "," + rowBounds.getPageSize();
    ResultSet resultSet = sqlTemplate.select(condition);
    // 4、将 ResultSet 转成 A 这里就是从 A_DB 到 A 的过程
    ResultSetExtractor<R> extractor = (ResultSetExtractor<R>) new ResultSetExtractor<>(originClass);
    List<R> result = extractor.extractData(resultSet);
    // 5、将 A 转成 B
    List<T> targetResult = convertor.batchConvertFrom(result);
    // 6、丢到队列中去等待消费
    this.rowObjectManager.pushToQueue(targetResult);
}

上述代码片段中的 4 ,就是从 A_DB 到 A 的过程,实际上这部分是 ResultSet 到 Java 对象的过程。一般情况下,我们基于 JDBC API 编程时, ResultSet 到 Java,对于业务来说是非常明确的。大体是这样:

String selectSql = "SELECT * FROM employees"; 
try (ResultSet resultSet = stmt.executeQuery(selectSql)) { 
List<Employee> employees = new ArrayList<>(); 
while (resultSet.next()) 
{ 
    Employee emp = new Employee(); 
    emp.setId(resultSet.getInt("emp_id"));
    emp.setName(resultSet.getString("name"));
    emp.setPosition(resultSet.getString("position")); 
    emp.setSalary(resultSet.getDouble("salary")); 
    employees.add(emp); 
} 

这种从对于明确 Java 对象的情况下是可以的;但是对于一个通用组件来说肯定无法满足。那么就需要解决如何将 ResultSet 转换成 Java 对象变得的更普适一些。思路是:ResultSet -> Map -> Java Object,通过 ResultSet 的 getMetaData 可以取到所有的列名(K)和值(V),并将其存储到 Map 中,代码如下:

/**
 * 将 resultSet 转成 Map
 *
 * @param resultSet
 * @return
 * @throws SQLException
 */
private Map<String, Object> resultSetToMap(ResultSet resultSet) throws Exception {
    Map<String, Object> resultMap = new HashMap<>();
    // 获取 ResultSet 的元数据
    int columnCount = resultSet.getMetaData().getColumnCount();
    // 遍历每一列,将列名和值存储到 Map 中
    for (int i = 1; i <= columnCount; i++) {
        String columnName = resultSet.getMetaData().getColumnName(i);
        Object value = resultSet.getObject(i);
        resultMap.put(columnName, value);
    }
    return resultMap;
}

接着是将 Map 转换成 Java 对象,当然在框架层面的实现上,这里通过泛型机制来实现更加通用的场景:

private T mapResultSetToObject(Map<String, Object> resultMap, Class<T> objectType) throws Exception {
    // 通过目标对象类型构建一个对象
    T object = objectType.newInstance();  
    // 将 map 的 key 作为 field 的名字,map 的 value 作为 field 的值
    for (Map.Entry<String, Object> entry : resultMap.entrySet()) {  
        String fieldName = entry.getKey();  
        Object value = entry.getValue();  
        try {  
            Field declaredField = objectType.getDeclaredField(fieldName);  
            declaredField.setAccessible(true);  
            declaredField.set(object, value);  
        } catch (NoSuchFieldException e) {  
            LOGGER.error("ignore exception, fieldName: " + fieldName + ", objectType: " + objectType);  
        }  
    }
    // 完成对象的填充并返回
    return object;  
}

从 A 到 B 属于业务自己定义的,即 Convertor 部分,下面是 Convertor 的接口定义,业务实现此接口用于实现对象到对象的转换(包括批量转换)

// T 是目标对象类型,R 是原始对象类型
public interface Convertor<T, R> {  
/**  
* 将 T 转换成 R  
*  
* @param origin  
* @return  
*/  
T convertFrom(R origin) throws SQLException;  
/**  
* 将 R 转换成 T  
*  
* @param target  
* @return  
*/  
R convertTo(T target);  
/**  
* 批量转换  
* @param origin  
* @return  
* @throws SQLException  
*/  
List<T> batchConvertFrom(List<R> origin) throws SQLException;  
/**  
* 批量转换将 R 转换成 T  
*  
* @param target  
* @return  
*/ 
List<R> batchConvertTo(List<T> target);

从 B 到 B_DB

前面是 A_DB 到 A 再到 B 的过程,那么接下来就是 B 到 B_DB 的过程。从前面的流程图中可以看出,组件中借助了队列。本篇文章的实现是基于 Disruptor 实现的。在 Convertor 中 A-> B 的逻辑完成之后,就会将 B 的 List 丢到 Disruptor 的 ringBuffer 中等待消费。消费逻辑如下:

public void onEvent(RowObjectEvent rowObjectEvent, long sequence, boolean endOfBatch) {  
    // targetResult  
    List<T> targetResult = (List) rowObjectEvent.getRowObject();  
    // 将 T 写到 目标库  
    Connection tc = null;  
    PreparedStatement pstm = null;  
    try {  
        // 下面即为获取连接,创建 prepareStatement 和执行
        tc = this.targetDataSource.getConnection();  
        SqlTemplate<T> sqlTemplate = new SqlTemplate<>(this.targetDataSource); 
        sqlTemplate.setObj(targetResult.get(0));  
        String sql = sqlTemplate.createBaseSql();  
        pstm = tc.prepareStatement(sql);  
        for (T item : targetResult) {  
            Object[] objects = sqlTemplate.createInsertSql(item);  
            for (int i = 1; i <= objects.length; i++) {  
                if (objects[i - 1] instanceof Long) {  
                    objects[i - 1] = (Long) objects[i - 1] + 1;  
                }  
                pstm.setObject(i, objects[i - 1]);  
            } 
            // 这里执行时批量操作的
            pstm.addBatch();  
        }  
        pstm.executeUpdate();  
    } catch (Exception e) {  
        // ignore some code...
    }  
}

通过上述几个片段,大体阐述了数据同步的过程,以及本篇所实现的组件的部分核心逻辑。

总结

本篇中的代码片段是比较零碎的,其中有相关部分的逻辑并没有在本篇中体现;本篇的主要目的是为了阐述基于 JDBC API 如何实现同步(实际上不仅仅是基于 JDBC 和面向关系型数据库),并且为围绕 DB -> A -> B -> DB 这样的思路给出了每一步实现的代码,有兴趣的同学可以尝试自己实现一个数据同步组件。

以上就是基于Java实现一个简单的数据同步组件的详细内容,更多关于Java数据同步组件的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文