Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go语言Elasticsearch全文搜索与日志分析

Go语言中Elasticsearch全文搜索与日志分析的实战

作者:王码码2035哦

Elasticsearch在Go语言中的应用非常广泛,本文就来介绍一下Go语言中Elasticsearch全文搜索与日志分析的实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

Elasticsearch作为最流行的搜索引擎之一,广泛应用于全文搜索、日志分析、指标监控等场景。本文将深入介绍如何在Go语言中使用Elasticsearch,从基础操作到高级应用,帮助你构建强大的搜索和分析能力。

Elasticsearch核心概念

快速开始

连接Elasticsearch

import "github.com/elastic/go-elasticsearch/v8"
func NewElasticsearchClient() (*elasticsearch.Client, error) {
    cfg := elasticsearch.Config{
        Addresses: []string{
            "http://localhost:9200",
        },
        Username: "elastic",
        Password: "password",
    }
    return elasticsearch.NewClient(cfg)
}

索引操作

// 创建索引
type Product struct {
    ID          string    `json:"id"`
    Name        string    `json:"name"`
    Description string    `json:"description"`
    Price       float64   `json:"price"`
    Category    string    `json:"category"`
    Tags        []string  `json:"tags"`
    CreatedAt   time.Time `json:"created_at"`
}
func CreateIndex(client *elasticsearch.Client) error {
    mapping := `{
        "mappings": {
            "properties": {
                "name": {
                    "type": "text",
                    "analyzer": "ik_max_word"
                },
                "description": {
                    "type": "text",
                    "analyzer": "ik_max_word"
                },
                "price": {
                    "type": "float"
                },
                "category": {
                    "type": "keyword"
                },
                "tags": {
                    "type": "keyword"
                },
                "created_at": {
                    "type": "date"
                }
            }
        }
    }`
    res, err := client.Indices.Create(
        "products",
        client.Indices.Create.WithBody(strings.NewReader(mapping)),
    )
    if err != nil {
        return err
    }
    defer res.Body.Close()
    return nil
}

文档操作

索引文档

func IndexDocument(client *elasticsearch.Client, product Product) error {
    data, err := json.Marshal(product)
    if err != nil {
        return err
    }
    res, err := client.Index(
        "products",
        bytes.NewReader(data),
        client.Index.WithDocumentID(product.ID),
        client.Index.WithRefresh("true"),
    )
    if err != nil {
        return err
    }
    defer res.Body.Close()
    return nil
}
// 批量索引
func BulkIndex(client *elasticsearch.Client, products []Product) error {
    var buf bytes.Buffer
    for _, product := range products {
        // 索引操作元数据
        meta := []byte(fmt.Sprintf(`{"index":{"_id":"%s"}}%s`, product.ID, "\n"))
        buf.Write(meta)
        // 文档数据
        data, _ := json.Marshal(product)
        buf.Write(data)
        buf.Write([]byte("\n"))
    }
    res, err := client.Bulk(
        bytes.NewReader(buf.Bytes()),
        client.Bulk.WithIndex("products"),
    )
    if err != nil {
        return err
    }
    defer res.Body.Close()
    return nil
}

搜索查询

func SearchProducts(client *elasticsearch.Client, query string, filters map[string]interface{}) ([]Product, error) {
    // 构建查询
    searchQuery := map[string]interface{}{
        "query": map[string]interface{}{
            "bool": map[string]interface{}{
                "must": []map[string]interface{}{
                    {
                        "multi_match": map[string]interface{}{
                            "query":  query,
                            "fields": []string{"name^3", "description", "tags"},
                            "type":   "best_fields",
                        },
                    },
                },
                "filter": []map[string]interface{}{},
            },
        },
        "sort": []map[string]interface{}{
            {"_score": "desc"},
            {"created_at": "desc"},
        },
        "from": 0,
        "size": 20,
    }
    // 添加过滤器
    for key, value := range filters {
        filter := map[string]interface{}{
            "term": map[string]interface{}{
                key: value,
            },
        }
        searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["filter"] = append(
            searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["filter"].([]map[string]interface{}),
            filter,
        )
    }
    queryJSON, _ := json.Marshal(searchQuery)
    res, err := client.Search(
        client.Search.WithIndex("products"),
        client.Search.WithBody(bytes.NewReader(queryJSON)),
    )
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()
    // 解析结果
    var result struct {
        Hits struct {
            Hits []struct {
                Source Product `json:"_source"`
            } `json:"hits"`
        } `json:"hits"`
    }
    if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
        return nil, err
    }
    products := make([]Product, 0, len(result.Hits.Hits))
    for _, hit := range result.Hits.Hits {
        products = append(products, hit.Source)
    }
    return products, nil
}

高级搜索功能

聚合分析

func AggregateByCategory(client *elasticsearch.Client) (map[string]int64, error) {
    aggQuery := map[string]interface{}{
        "size": 0,
        "aggs": map[string]interface{}{
            "categories": map[string]interface{}{
                "terms": map[string]interface{}{
                    "field": "category",
                    "size":  10,
                },
            },
            "price_stats": map[string]interface{}{
                "stats": map[string]interface{}{
                    "field": "price",
                },
            },
        },
    }
    
    queryJSON, _ := json.Marshal(aggQuery)
    
    res, err := client.Search(
        client.Search.WithIndex("products"),
        client.Search.WithBody(bytes.NewReader(queryJSON)),
    )
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()
    
    // 解析聚合结果
    var result struct {
        Aggregations struct {
            Categories struct {
                Buckets []struct {
                    Key   string `json:"key"`
                    Count int64  `json:"doc_count"`
                } `json:"buckets"`
            } `json:"categories"`
        } `json:"aggregations"`
    }
    
    if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
        return nil, err
    }
    
    categories := make(map[string]int64)
    for _, bucket := range result.Aggregations.Categories.Buckets {
        categories[bucket.Key] = bucket.Count
    }
    
    return categories, nil
}

自动补全

func SuggestProducts(client *elasticsearch.Client, prefix string) ([]string, error) {
    suggestQuery := map[string]interface{}{
        "suggest": map[string]interface{}{
            "product-suggest": map[string]interface{}{
                "prefix":     prefix,
                "completion": map[string]interface{}{"field": "suggest"},
            },
        },
    }
    queryJSON, _ := json.Marshal(suggestQuery)
    res, err := client.Search(
        client.Search.WithIndex("products"),
        client.Search.WithBody(bytes.NewReader(queryJSON)),
    )
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()
    // 解析建议结果
    var result struct {
        Suggest struct {
            ProductSuggest []struct {
                Options []struct {
                    Text string `json:"text"`
                } `json:"options"`
            } `json:"product-suggest"`
        } `json:"suggest"`
    }
    if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
        return nil, err
    }
    suggestions := make([]string, 0)
    for _, suggest := range result.Suggest.ProductSuggest {
        for _, option := range suggest.Options {
            suggestions = append(suggestions, option.Text)
        }
    }
    return suggestions, nil
}

日志分析应用

日志索引设计

type LogEntry struct {
    Timestamp time.Time       `json:"@timestamp"`
    Level     string          `json:"level"`
    Message   string          `json:"message"`
    Service   string          `json:"service"`
    TraceID   string          `json:"trace_id"`
    Metadata  map[string]interface{} `json:"metadata"`
}
func CreateLogIndex(client *elasticsearch.Client) error {
    mapping := `{
        "mappings": {
            "properties": {
                "@timestamp": {
                    "type": "date"
                },
                "level": {
                    "type": "keyword"
                },
                "message": {
                    "type": "text"
                },
                "service": {
                    "type": "keyword"
                },
                "trace_id": {
                    "type": "keyword"
                },
                "metadata": {
                    "type": "object",
                    "dynamic": true
                }
            }
        },
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0,
            "index.lifecycle.name": "logs_policy",
            "index.lifecycle.rollover_alias": "logs"
        }
    }`
    res, err := client.Indices.Create(
        "logs-000001",
        client.Indices.Create.WithBody(strings.NewReader(mapping)),
    )
    if err != nil {
        return err
    }
    defer res.Body.Close()
    return nil
}

日志搜索与分析

func SearchLogs(client *elasticsearch.Client, service string, level string, startTime, endTime time.Time) ([]LogEntry, error) {
    query := map[string]interface{}{
        "query": map[string]interface{}{
            "bool": map[string]interface{}{
                "must": []map[string]interface{}{
                    {
                        "term": map[string]interface{}{
                            "service": service,
                        },
                    },
                    {
                        "range": map[string]interface{}{
                            "@timestamp": map[string]interface{}{
                                "gte": startTime.Format(time.RFC3339),
                                "lte": endTime.Format(time.RFC3339),
                            },
                        },
                    },
                },
            },
        },
        "sort": []map[string]interface{}{
            {"@timestamp": "desc"},
        },
        "size": 100,
    }
    if level != "" {
        query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(
            query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]map[string]interface{}),
            map[string]interface{}{
                "term": map[string]interface{}{
                    "level": level,
                },
            },
        )
    }
    queryJSON, _ := json.Marshal(query)
    res, err := client.Search(
        client.Search.WithIndex("logs-*"),
        client.Search.WithBody(bytes.NewReader(queryJSON)),
    )
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()
    var result struct {
        Hits struct {
            Hits []struct {
                Source LogEntry `json:"_source"`
            } `json:"hits"`
        } `json:"hits"`
    }
    if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
        return nil, err
    }
    logs := make([]LogEntry, 0, len(result.Hits.Hits))
    for _, hit := range result.Hits.Hits {
        logs = append(logs, hit.Source)
    }
    return logs, nil
}

性能优化

批量操作优化

type BulkProcessor struct {
    client    *elasticsearch.Client
    buffer    []Product
    batchSize int
    flushInterval time.Duration
}
func (bp *BulkProcessor) Add(product Product) {
    bp.buffer = append(bp.buffer, product)
    if len(bp.buffer) >= bp.batchSize {
        bp.Flush()
    }
}
func (bp *BulkProcessor) Flush() error {
    if len(bp.buffer) == 0 {
        return nil
    }
    var buf bytes.Buffer
    for _, product := range bp.buffer {
        meta := []byte(fmt.Sprintf(`{"index":{"_id":"%s"}}%s`, product.ID, "\n"))
        buf.Write(meta)
        data, _ := json.Marshal(product)
        buf.Write(data)
        buf.Write([]byte("\n"))
    }
    res, err := bp.client.Bulk(
        bytes.NewReader(buf.Bytes()),
        bp.client.Bulk.WithIndex("products"),
    )
    if err != nil {
        return err
    }
    defer res.Body.Close()
    bp.buffer = bp.buffer[:0]
    return nil
}
func (bp *BulkProcessor) Start(ctx context.Context) {
    ticker := time.NewTicker(bp.flushInterval)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            bp.Flush()
            return
        case <-ticker.C:
            bp.Flush()
        }
    }
}

连接池配置

func NewOptimizedClient() (*elasticsearch.Client, error) {
    cfg := elasticsearch.Config{
        Addresses: []string{"http://localhost:9200"},
        // 连接池配置
        MaxRetries:    3,
        RetryOnStatus: []int{502, 503, 504},
        // 传输层配置
        Transport: &http.Transport{
            MaxIdleConns:        10,
            MaxIdleConnsPerHost: 10,
            IdleConnTimeout:     30 * time.Second,
        },
        // 超时配置
        RequestTimeout: 10 * time.Second,
    }
    return elasticsearch.NewClient(cfg)
}

总结

Elasticsearch在Go语言中的应用非常广泛,从全文搜索到日志分析都能胜任。在使用过程中需要注意:

  1. 合理设计Mapping:根据业务需求选择合适的字段类型
  2. 批量操作:减少网络往返,提高写入性能
  3. 分页优化:深分页使用search_after替代from/size
  4. 监控与告警:关注集群健康状态,及时发现问题

到此这篇关于Go语言中Elasticsearch全文搜索与日志分析的实战的文章就介绍到这了,更多相关Go语言Elasticsearch全文搜索与日志分析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文