SpringBoot整合InfluxDB的详细过程
作者:喝汽水的猫^
一、简单介绍InfluxDB是什么?
InfluxDB是一个由InfluxData开发的开源时序型数据。它由Go写成,着力于高性能地查询与存储时序型数据。InfluxDB被广泛应用于存储系统的监控数据,IoT行业的实时数据等场景。
1、主要特点
时间序列数据存储
专门设计用于高效处理按时间顺序产生的数据,如传感器数据、日志数据、指标数据等。时间戳是 InfluxDB 中数据的关键组成部分,确保数据的时间顺序性。
可以存储大量的时间序列数据,并提供高效的查询和存储机制,以满足对实时数据和历史数据的处理需求。
高性能
针对时间序列数据的特点进行了优化,能够快速写入和查询大规模的数据。它采用了高效的数据存储结构和索引机制,使得数据的读写操作非常迅速。
支持高并发的写入和查询,可以满足大规模数据采集和实时监控系统的需求。
灵活的数据模型
InfluxDB 使用一种灵活的数据模型,包括测量(measurement)、标签(tag)和字段(field)。
测量类似于传统数据库中的表,用于存储具有相同数据结构的时间序列数据。标签用于对数据进行分类和索引,方便快速查询。字段则存储实际的测量值,可以是数值、字符串或布尔值等。
强大的查询语言
InfluxDB 提供了一种功能强大的查询语言 InfluxQL,用于查询和分析时间序列数据。
InfluxQL 支持各种聚合函数、时间范围查询、过滤条件等,可以方便地进行数据分析和可视化。它还支持连续查询(Continuous Queries)和存储策略(Retention Policies),可以自动对数据进行聚合和清理,以提高查询性能和节省存储空间。
2、应用场景
物联网(IoT)
在物联网应用中,大量的传感器设备会不断产生时间序列数据,如温度、湿度、压力等。InfluxDB 可以高效地存储和查询这些数据,为物联网数据分析和监控提供支持。
可以实时监测设备状态、分析设备性能、预测设备故障等。
系统监控
用于监控服务器、网络设备、应用程序等的性能指标。例如,可以收集 CPU 使用率、内存使用率、网络流量等数据,并使用 InfluxDB 进行存储和分析。
通过实时监控和历史数据分析,可以及时发现系统性能问题,进行故障排除和优化。
金融交易数据分析
在金融领域,时间序列数据非常重要,如股票价格、汇率、交易量等。InfluxDB 可以用于存储和分析这些金融数据,为交易决策和风险评估提供支持。
可以进行实时行情分析、历史数据回溯、交易策略评估等。
日志分析
可以将日志数据以时间序列的形式存储在 InfluxDB 中,方便进行日志分析和故障排查。
通过查询特定时间范围内的日志数据,可以快速定位问题发生的时间和原因。
总之,InfluxDB 是一个功能强大的时间序列数据库,适用于各种需要处理时间序列数据的场景。它的高性能、灵活的数据模型和强大的查询语言使得它成为了许多企业和开发者的首选数据库之一。
想要更深入了解,请:点击这里
二、使用步骤
1、集成原生的InfluxDB
依赖:
<!-- InfluxDB 原生依赖 --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.22</version> </dependency>
配置:
#--------- # Influxdb #--------- influxdb: url: http://127.0.0.1:8086 username: admin password: admin database: test retention: autogen //数据保留策略
InfluxDB数据库操作类:
package com.geesun.influxdb; import cn.hutool.core.collection.CollUtil; import org.influxdb.InfluxDB; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.*; import org.influxdb.dto.Point.Builder; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import plus.ojbk.influxdb.autoconfigure.properties.InfluxdbProperties; import javax.annotation.Resource; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * InfluxDB数据库操作类 */ @Service public class InfluxDbCommand { @Resource private InfluxDB influxDB; @Resource private InfluxdbProperties config; @Value("${influxdb.retention}") private String retentionPolicy; /** * 测试连接是否正常 * * @return true 正常 */ public boolean ping() { boolean isConnected = false; Pong pong; try { pong = influxDB.ping(); if (pong != null) { isConnected = true; } } catch (Exception e) { e.printStackTrace(); } return isConnected; } /** * 切换数据库 */ public void setDB(String dbName) { influxDB.setDatabase(dbName); } /** * 关闭数据库 */ public void close() { influxDB.close(); } /** * 创建自定义保留策略 * * @param policyName 策略名 * @param days 保存天数 * @param replication 保存副本数量 * @param isDefault 是否设为默认保留策略 */ public void createRetentionPolicy(String policyName, int days, int replication, Boolean isDefault) { String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName, config.getDatabase(), days, replication); if (Boolean.TRUE.equals(isDefault)) { sql = sql + " DEFAULT"; } query(sql); } /** * 切换策略 * * @param policyName 策略名 */ public void updRetentionPolicy(String policyName) { String sql = "ALTER RETENTION POLICY \"" + policyName + "\" ON \"" + config.getDatabase() + "\" DEFAULT"; query(sql); this.retentionPolicy = policyName; } /** * 创建默认的保留策略 * <p> * 策略名:hour,保存天数:30天,保存副本数量:1,设为默认保留策略 */ public void createDefaultRetentionPolicy() { String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT" , "hour", config.getDatabase(), "30d", 1); this.query(command); } /*********************************增删查**************************************************/ /** * 查询 * * @param command 查询语句 * @return */ public QueryResult query(String command) { return influxDB.query(new Query(command, config.getDatabase())); } /** * 插入 * * @param measurement 表 * @param tags 标签 * @param fields 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time, TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != time) { builder.time(time, timeUnit); } influxDB.write(config.getDatabase(), retentionPolicy, builder.build()); } /** * 插入 * * @param measurement 表 * @param tags 标签 * @param fields 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) { insert(measurement, tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** * 删除 * * @param command 删除语句 * @return 返回错误信息 */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, config.getDatabase())); return result.getError(); } /** * 构建Point * * @param measurement 表 * @param time 时间 * @param timeUnit 时间单位 * @param tags tags * @param fields * @return */ public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags , Map<String, Object> fields) { return Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build(); } /** * 批量写入测点 * * @param batchPoints */ public void batchInsert(BatchPoints batchPoints) { influxDB.write(batchPoints); } /** * 批量写入数据 * * @param database 数据库 * @param retentionPolicy 保存策略 * @param consistency 一致性 * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */ public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency , TimeUnit timeUnit, final List<String> records) { influxDB.write(database, retentionPolicy, consistency, timeUnit, records); } /** * 查询-把查询出的结果集转换成对应的实体对象,聚合成list * @param command : sql语句 */ public List<Map<String, Object>> queryWrapper(String command) { List<Map<String, Object>> list = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(command)); List<QueryResult.Result> resultList = queryResult.getResults(); for (QueryResult.Result result : resultList) { List<QueryResult.Series> seriesList = result.getSeries(); if (CollUtil.isEmpty(seriesList)) { return list; } for (QueryResult.Series series : seriesList) { List<String> columns = series.getColumns(); List<List<Object>> values = series.getValues(); if (CollUtil.isEmpty(values)) { continue; } values.forEach(value -> { Map<String, Object> map = new HashMap<>(); for (int i = 0; i < columns.size(); i++) { map.put(columns.get(i), value.get(i)); } list.add(map); }); } } return list; } }
2、集成封装的InfluxDBTemplate
依赖:
<dependency> <groupId>plus.ojbk</groupId> <artifactId>influxdb-spring-boot-starter</artifactId> <version>1.0.2</version> </dependency>
配置:
#--------- # Influxdb #--------- influxdb: url: http://127.0.0.1:8086 username: admin password: admin database: test retention: autogen //数据保留策略
实体,对标influxDB的表:
package io.springboot.influxdb.entity; import lombok.Data; import org.influxdb.annotation.Column; import org.influxdb.annotation.Measurement; import plus.ojbk.influxdb.annotation.Count; import java.math.BigDecimal; import java.time.LocalDateTime; /** * @version 1.0 * @since 2021/6/17 18:26 */ @Data @Measurement(name = "device") public class Device { /** * 设备编号 */ @Column(name="device_no", tag = true) //tag 可以理解为influxdb的索引 private String deviceNo; /** * 数据值 */ @Count("value") @Column(name="value") private BigDecimal value; /** * 电压 */ @Column(name="voltage") private Float voltage; /** * 状态 */ @Column(name="state") private Boolean state; /** * 上报时间 */ @Column(name="time") private LocalDateTime time; }
测试:
package io.springboot.influxdb; import com.alibaba.fastjson.JSON; import io.springboot.influxdb.entity.Device; import org.influxdb.dto.QueryResult; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import plus.ojbk.influxdb.core.Delete; import plus.ojbk.influxdb.core.InfluxdbTemplate; import plus.ojbk.influxdb.core.Op; import plus.ojbk.influxdb.core.Order; import plus.ojbk.influxdb.core.Query; import plus.ojbk.influxdb.core.model.DeleteModel; import plus.ojbk.influxdb.core.model.QueryModel; import java.math.BigDecimal; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; @SpringBootTest class InfluxdbDemoApplicationTests { @Autowired private InfluxdbTemplate influxdbTemplate; private String measurement = "device"; @Test void getCount() { QueryModel countModel = new QueryModel(); ///countModel.setMeasurement(measurement); countModel.setMeasurement(InfluxdbUtils.getMeasurement(Device.class)); countModel.setStart(LocalDateTime.now().plusHours(-2L)); countModel.setEnd(LocalDateTime.now()); //countModel.setSelect(Query.count("voltage")); //只能count field字段 countModel.setSelect(Query.count(InfluxdbUtils.getCountField(Device.class))); countModel.setWhere(Op.where(countModel)); //获得总条数 long count = influxdbTemplate.count(Query.build(countModel)); System.err.println(count); } @Test void getData() { QueryModel model = new QueryModel(); model.setCurrent(1L); //当前页 model.setSize(10L); //每页大小 //model.setMeasurement(measurement); model.setMeasurement(InfluxdbUtils.getMeasurement(Device.class)); model.setStart(LocalDateTime.now().plusHours(-2L)); //开始时间 model.setEnd(LocalDateTime.now()); //结束时间 model.setUseTimeZone(true); //时区 model.setOrder(Order.DESC); //排序 //where 条件中额外参数可放入model.setMap(); model.setWhere(Op.where(model)); //理解为where条件 //分页数据 List<Device> deviceList = influxdbTemplate.selectList(Query.build(model), Device.class); System.err.println(JSON.toJSONString(deviceList)); } @Test void insert() { List<Device> deviceList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Device device = new Device(); device.setDeviceNo("device-" + i); device.setValue(new BigDecimal(12.548)); device.setState(true); device.setVoltage(3.5F); deviceList.add(device); } influxdbTemplate.insert(deviceList); } @Test void delete() { Map<String, Object> map = new TreeMap<>(); map.put("device_no", "device-1"); DeleteModel model = new DeleteModel(); model.setMap(map); //model.setStart(LocalDateTime.now().plusHours(-10L)); //model.setEnd(LocalDateTime.now()); model.setMeasurement(measurement); model.setWhere(Op.where(model)); influxdbTemplate.delete(Delete.build(model)); } void other(){ influxdbTemplate.execute("自己写sql"); } }
相较于原版,它封装了自有的Util以及Template等,对于原版Point的time列类型问题,它对number和long 型转换成了LocalDateTime类型,并且封装了更多的方法(具体自行拓展)。
注:原生的influxDB和spring自带的可一起使用。
到此这篇关于SpringBoot整合InfluxDB的文章就介绍到这了,更多相关SpringBoot整合InfluxDB内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!