Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Golang访问Apache IoTTDB教程

使用Golang访问Apache IoTTDB时序数据库:环境搭建 + 连接池 + 全接口实战

作者:xcLeigh

本文详细介绍了使用Golang访问Apache IoTTDB时序数据库的方法,包括环境准备、核心步骤、示例代码和接口说明,主要介绍了创建SessionPool、数据库操作(如写入、查询)、元TDB配置等分类整理接口,帮助开发者快速高效地实现IoTDB集成和开发

IoTDB Go 原生 API 提供 SessionSessionPool 两种交互方式。由于 Session 非线程安全,高并发场景强烈推荐使用 SessionPool,能高效管理连接、提升系统性能与资源利用率。

本文从环境准备、核心流程、完整示例到全量接口,带你快速用 Go 接入 IoTDB 时序数据库。

1. 环境准备

1.1 前置依赖

1.2 安装方法

使用 Go Mod(推荐)

# 启用 Go Modules
export GO111MODULE=on
# 配置代理
export GOPROXY=https://goproxy.io

# 创建项目目录
mkdir session_example && cd session_example

# 下载官方示例
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go

# 初始化并下载依赖
go mod init session_example
go mod tidy

# 运行
go run session_example.go

使用 GOPATH

# 安装 thrift
go get github.com/apache/thrift@0.13.0

# 创建目录
mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
cd $GOPATH/src/iotdb-client-go-example/session_example

# 下载示例
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go

# 初始化依赖
go mod init
go mod tidy

# 运行
go run session_example.go

⚠️ 重要提醒:禁止高版本客户端连接低版本服务端,否则会出现连接异常、数据写入失败等问题。

2. 核心开发步骤

使用 Go 操作 IoTDB 只需要三步:

  1. 创建 SessionPool 连接池实例
  2. 从连接池获取 Session 执行操作,用完归还
  3. 程序结束关闭连接池释放资源

2.1 创建连接池实例

单节点模式

config := &client.PoolConfig{
    Host:     "127.0.0.1",
    Port:     "6667",
    UserName: "root",
    Password: "root",
}
// 创建连接池:最大连接数3、连接超时60s、获取等待60s、关闭压缩
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()

分布式/双活模式

config := &client.PoolConfig{
    UserName: "root",
    Password: "root",
    NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()

2.2 数据库操作

数据写入(Tablet 推荐)

session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)

status, err := session.InsertTablet(tablet, false)
tablet.Reset()

数据查询

var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)

sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
defer sessionDataSet.Close()

2.3 完整可运行示例

package main

import (
    "flag"
    "fmt"
    "log"
    "math/rand"
    "strings"
    "time"
    "github.com/apache/iotdb-client-go/v2/client"
    "github.com/apache/iotdb-client-go/v2/common"
)

var (
    host     string
    port     string
    user     string
    password string
    sessionPool client.SessionPool
)

func main() {
    flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
    flag.StringVar(&port, "port", "6667", "--port=6667")
    flag.StringVar(&user, "user", "root", "--user=root")
    flag.StringVar(&password, "password", "root", "--password=root")
    flag.Parse()

    // 1. 创建连接池
    config := &client.PoolConfig{Host: host, Port: port, UserName: user, Password: password}
    sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
    defer sessionPool.Close()

    // 2. 元数据操作
    setStorageGroup("root.sg1")
    createTimeseries("root.sg1.dev1.temperature")

    // 3. 写入数据
    insertTablet()

    // 4. 查询数据
    executeQueryStatement("select temperature from root.sg1.dev1")

    // 5. 清理资源
    deleteTimeseries("root.sg1.dev1.temperature")
    deleteStorageGroup("root.sg1")
}

// 设置存储组
func setStorageGroup(sg string) {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        session.SetStorageGroup(sg)
    }
}

// 创建时间序列
func createTimeseries(path string) {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        checkError(session.CreateTimeseries(path, client.FLOAT, client.PLAIN, client.SNAPPY, nil, nil))
    }
}

// 插入Tablet
func insertTablet() {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil {
        tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{{Measurement: "temperature", DataType: client.FLOAT}}, 12)
        if err != nil {
            log.Fatal(err)
        }
        ts := time.Now().UTC().UnixNano() / 1000000
        for row := 0; row < 12; row++ {
            ts++
            tablet.SetTimestamp(ts, row)
            tablet.SetValueAt(rand.Float32(), 0, row)
            tablet.RowSize++
        }
        status, err := session.InsertTablet(tablet, false)
        tablet.Reset()
        checkError(status, err)
    }
}

// 查询语句
func executeQueryStatement(sql string) {
    var timeout int64 = 1000
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err != nil {
        log.Print(err)
        return
    }
    sds, err := session.ExecuteQueryStatement(sql, &timeout)
    if err == nil {
        defer sds.Close()
        cols := sds.GetColumnNames()
        for _, c := range cols { fmt.Printf("%s\t", c) }
        fmt.Println()
        for next, _ := sds.Next(); next; next, _ = sds.Next() {
            for _, c := range cols {
                if null, _ := sds.IsNull(c); null {
                    fmt.Print("null\t")
                } else {
                    v, _ := sds.GetString(c)
                    fmt.Printf("%s\t", v)
                }
            }
            fmt.Println()
        }
    }
}

// 删除时序
func deleteTimeseries(paths ...string) {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil { checkError(session.DeleteTimeseries(paths)) }
}

// 删除存储组
func deleteStorageGroup(sg string) {
    session, err := sessionPool.GetSession()
    defer sessionPool.PutBack(session)
    if err == nil { checkError(session.DeleteStorageGroup(sg)) }
}

// 错误检查
func checkError(status *common.TSStatus, err error) {
    if err != nil { log.Fatal(err) }
    if status != nil {
        if e := client.VerifySuccess(status); e != nil { log.Println(e) }
    }
}

3. 全量接口说明

3.1 SessionPool 管理接口

接口功能说明
NewSessionPool创建连接池支持单节点/集群
GetSession获取会话必须与 PutBack 配对
PutBack归还会话用完立即归还
Close关闭连接池程序退出前调用

3.2 数据写入接口

支持 Record/Tablet,支持对齐/非对齐,批量写入优先使用 Tablet

3.3 SQL 与查询接口

3.4 元数据操作接口

3.5 PoolConfig 关键配置

附:IoTDB的各大版本

📄 Apache IoTDB 是一款工业物联网时序数据库管理系统,采用端边云协同的轻量化架构,支持一体化的物联网时序数据收集、存储、管理与分析 ,具有多协议兼容、超高压缩比、高通量读写、工业级稳定、极简运维等特点。

版本IoTDB 二进制包IoTDB 源代码发布说明
2.0.5- All-in-one
- AINode
- SHA512
- ASC
- 源代码
- SHA512
- ASC
release notes
1.3.5- All-in-one
- AINode
- SHA512
- ASC
- 源代码
- SHA512
- ASC
release notes
0.13.4- All-in-one
- Grafana 连接器
- Grafana 插件
- SHA512
- ASC
- 源代码
- SHA512
- ASC
release notes

✨ 目前最新版本为2.0.7,去获取:https://archive.apache.org/dist/iotdb/
 

文章总结

本文全面讲解 Apache IoTDB Go 原生接口的使用方法,先明确 Golang、Thrift 等依赖要求,提供 Go Mod 与 GOPATH 两种安装方式。重点介绍 SessionPool 连接池 的创建、使用与归还规范,给出包含创建存储组、时序、数据写入、查询、删除的完整可运行代码。同时分类整理连接池管理、数据写入、SQL查询、元数据操作等全量接口,清晰说明配置项与使用注意事项。帮助 Go 开发者快速完成 IoTDB 集成,实现高并发、高性能的时序数据开发与生产落地。

到此这篇关于使用Golang访问Apache IoTTDB时序数据库:环境搭建 + 连接池 + 全接口实战的文章就介绍到这了,更多相关Golang访问Apache IoTTDB教程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文