Go语言编写高可用日志收集脚本
作者:程序员爱钓鱼
在分布式系统和微服务架构中,日志是排查问题、审计行为、监控状态的重要来源。本篇作为《Go语言100个实战案例》中的一篇,带你从设计到实现,完整写出一个轻量级、高可用的日志收集脚本(Agent),能够实时采集多个本地日志文件、处理文件切割(rotation)、按批发送到远端聚合服务,并具备重试、限流和优雅停止能力。
目标与场景
目标:实现一个“可部署到每台机器上”的日志采集脚本(agent),功能包括:
- 监控并 tail 多个指定日志文件(支持通配符)
- 处理日志切割(rotation)场景(无需丢失数据)
- 将日志按批次、JSON 格式发送到远端 HTTP 接收端(可替换为 Kafka/gRPC)
- 支持并发、限流、指数退避重试和本地缓冲
- 可优雅停止并保证数据尽可能送达
适用场景:小型到中型集群的轻量采集、调试环境、或作为自研日志管道的一部分。
技术选型(简要)
- 语言:Go(并发模型天然适合)
- 文件 tail:
github.com/hpcloud/tail
(成熟、支持 rotation)——也可用fsnotify
+ 自实现 tail,但 hpcloud/tail 工具成熟、代码量少 - 网络传输:HTTP POST + gzip + JSON(易于接入)
- 配置:命令行 flags + 简单 JSON/YAML(本文用 flags)
- 重试策略:指数退避(带上限)
注:示例使用 hpcloud/tail
来可靠处理文件 truncation/rotation,实际生产可替换为更复杂的 offset 存储(保证断点续传)
项目结构(示意)
log-agent/
├─ main.go
├─ sender.go
├─ tailer.go
├─ go.mod
下面直接给出一个 单文件(main.go) 的可运行示例,方便快速理解与使用。
完整代码(main.go)
// main.go package main import ( "bufio" "bytes" "compress/gzip" "context" "encoding/json" "flag" "fmt" "io" "net/http" "os" "os/signal" "path/filepath" "sync" "syscall" "time" "github.com/hpcloud/tail" ) // LogRecord 定义发送到服务器的 JSON 结构 type LogRecord struct { Timestamp time.Time `json:"timestamp"` Host string `json:"host"` Path string `json:"path"` Line string `json:"line"` } // Config var ( globPattern = flag.String("paths", "/var/log/*.log", "日志文件路径,支持通配符") endpoint = flag.String("endpoint", "http://127.0.0.1:8080/ingest", "日志收集服务地址") batchSize = flag.Int("batch", 200, "每次发送最大条数") batchWait = flag.Duration("wait", 2*time.Second, "批量发送最大等待时间") workers = flag.Int("workers", 4, "并发发送 worker 数") maxQueue = flag.Int("queue", 2000, "本地队列最大条数,超出丢弃最老") ) func main() { flag.Parse() host, _ := os.Hostname() paths, err := filepath.Glob(*globPattern) if err != nil { fmt.Fprintf(os.Stderr, "invalid pattern: %v\n", err) os.Exit(1) } if len(paths) == 0 { fmt.Fprintf(os.Stderr, "no logs matched pattern: %s\n", *globPattern) os.Exit(1) } // context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // signal handling sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigc fmt.Println("received shutdown signal, stopping...") cancel() }() // central channel for lines lineCh := make(chan LogRecord, *maxQueue) var wg sync.WaitGroup // start tailers for _, p := range paths { wg.Add(1) go func(path string) { defer wg.Done() if err := tailFile(ctx, path, host, lineCh); err != nil { fmt.Fprintf(os.Stderr, "tail %s error: %v\n", path, err) } }(p) } // start sender workers senderWg := &sync.WaitGroup{} for i := 0; i < *workers; i++ { senderWg.Add(1) go func(id int) { defer senderWg.Done() runSender(ctx, id, lineCh, *endpoint, *batchSize, *batchWait) }(i) } // wait for tailers to finish (on ctx cancel they will exit) wg.Wait() // close channel to signal senders to flush and exit close(lineCh) // wait for senders to finish senderWg.Wait() fmt.Println("agent stopped") } // tailFile 使用 hpcloud/tail 跟踪文件 func tailFile(ctx context.Context, path, host string, out chan<- LogRecord) error { cfg := tail.Config{ Follow: true, ReOpen: true, // 支持日志切割后重新打开 MustExist: false, Poll: true, Logger: tail.DiscardingLogger, } t, err := tail.TailFile(path, cfg) if err != nil { return err } defer t.Cleanup() for { select { case <-ctx.Done(): t.Cleanup() return nil case line, ok := <-t.Lines: if !ok { // channel closed; end return nil } if line == nil { continue } rec := LogRecord{ Timestamp: time.Now().UTC(), Host: host, Path: path, Line: line.Text, } // non-blocking send to avoid blocking tail; drop oldest if full select { case out <- rec: default: // drop one and push new (simple policy) select { case <-out: default: } select { case out <- rec: default: // give up if still full } } } } } // runSender 聚合并发送日志,带简单重试 func runSender(ctx context.Context, id int, in <-chan LogRecord, endpoint string, batchSize int, batchWait time.Duration) { httpClient := &http.Client{ Timeout: 10 * time.Second, } buf := make([]LogRecord, 0, batchSize) sendBatch := func(batch []LogRecord) error { if len(batch) == 0 { return nil } // marshal data, err := json.Marshal(batch) if err != nil { return err } // gzip body var b bytes.Buffer gw := gzip.NewWriter(&b) if _, err := gw.Write(data); err != nil { _ = gw.Close() return err } _ = gw.Close() req, _ := http.NewRequest("POST", endpoint, &b) req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Type", "application/json") // retry with exponential backoff var attempt int for { attempt++ resp, err := httpClient.Do(req) if err == nil { io.Copy(io.Discard, resp.Body) resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode < 300 { return nil } err = fmt.Errorf("bad status: %s", resp.Status) } // on ctx done, abort immediately select { case <-ctx.Done(): return fmt.Errorf("context canceled") default: } if attempt >= 5 { return err } // backoff sleep := time.Duration(500*(1<<uint(attempt-1))) * time.Millisecond if sleep > 10*time.Second { sleep = 10 * time.Second } time.Sleep(sleep) } } timer := time.NewTimer(batchWait) defer timer.Stop() for { select { case <-ctx.Done(): // flush remaining _ = sendBatch(buf) return case rec, ok := <-in: if !ok { // channel closed -> flush and exit _ = sendBatch(buf) return } buf = append(buf, rec) if len(buf) >= batchSize { _ = sendBatch(buf) buf = buf[:0] if !timer.Stop() { select { case <-timer.C: default: } } timer.Reset(batchWait) } case <-timer.C: if len(buf) > 0 { _ = sendBatch(buf) buf = buf[:0] } timer.Reset(batchWait) } } }
使用方法
1.初始化模块并获取依赖:
go mod init example.com/log-agent go get github.com/hpcloud/tail go build -o log-agent main.go
2.运行(示例):
./log-agent -paths "/var/log/myapp/*.log" -endpoint "http://log-collector:8080/ingest" -batch 100 -workers 4
3.建议把 agent 用 systemd 管理或容器化部署为 DaemonSet(K8s)或 sidecar。
实践要点与注意事项
日志切割:使用 ReOpen: true
可处理 logrotate
产生的新文件句柄;生产环境建议结合 inode 校验与持久化 offset(例如把 offset 存到本地文件或 SQLite)以支持重启断点续传。
传输安全:生产环境使用 HTTPS + 鉴权(API Key / mTLS)来防止日志被窃取或篡改。
后端吞吐:发送端需要限流与批次控制,避免短时间内把流量拉爆目标端。也可以使用本地磁盘队列(如 diskqueue)在网络中断时持久化缓存。
结构化日志:尽量让应用输出结构化 JSON 日志,这样聚合与查询更强。若是 plain text,可在 agent 处做简单解析(regex)或转发原始行。
监控与自检:给 agent 加入心跳/metrics(Prometheus)接口,监控发送失败数、队列长度等关键指标。
日志隐私:注意日志中可能包含敏感数据(PII、密码、token),可在 agent 端进行脱敏或过滤再上报。
进一步改进(思路)
- 使用持久化队列(disk-backed)保证断网或进程崩溃后不丢日志。
- 支持多种传输后端:Kafka、gRPC、AWS S3、Elasticsearch 等。
- 支持日志标签(service、env、pod)自动注入(从系统 / 环境变量获取)。
- 增加插件化解析器(nginx、app custom parser)做字段抽取。
- 通过 Web UI 或配置中心动态下发采集规则。
总结
这篇文章展示了如何用 Go 快速实现一个可靠、可扩展的日志收集脚本:从文件采集、切割处理,到批量发送与重试策略,都给出了实际可运行的示例代码。实现中充分利用了 Go 的并发、channel 与 context,代码简洁、易扩展。把这个 agent 打包部署在每台节点上,就能为后端日志聚合系统提供稳定可靠的数据源。
到此这篇关于Go语言编写高可用日志收集脚本的文章就介绍到这了,更多相关Go日志收集内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!