Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > go beyla BEYLA_OPEN_PORT进程

以go为例探究beyla从环境变量BEYLA_OPEN_PORT发现进程原理

作者:a朋

这篇文章主要为大家介绍了以golang进程为例,研究beyla从环境变量BEYLA_OPEN_PORT(即通过端口)发现进程的原理,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

beyla源码中,关于BEYLA_OPEN_PORT的定义

// beyla/pkg/internal/pipe/config.go
type Config struct {
    ...
    Port services.PortEnum `yaml:"open_port" env:"BEYLA_OPEN_PORT"`
    ...
}
type PortEnum struct {
    ranges []portRange
}
type portRange struct {
    start int
    // if end == 0, it means this entry is not a port range but a single port
    end int
}

可以看出,BEYLA_OPEN_PORT是个环境变量,它对应的变量类型是ProtEnum,它是一个数值范围的集合。

这里仅以指定单个值为例,如BEYLA_OPEN_PORT=8080,此时start=8080,end=0。

启动golang程序并发现它

启动beyla/exmpales下的example-http-service进程:

# curl -OL https://raw.githubusercontent.com/grafana/beyla/main/examples/example-http-service/example-http-service.go
# go run ./example-http-service.go

然后启动beyla,指定BEYLA_OPEN_PORT=8080,通过端口发现进程:

# BEYLA_PROMETHEUS_PORT=9400 BEYLA_OPEN_PORT=8080 BEYLA_LOG_LEVEL=DEBUG beyla

最后beyla的日志中,就可以发现成功发现了该进程:

...
time=2023-12-12T21:43:42.358-05:00 level=DEBUG msg="filtering processes" component=discover.CriteriaMatcher len=337
time=2023-12-12T21:43:42.435-05:00 level=DEBUG msg="found process" component=discover.CriteriaMatcher pid=612536 comm=~/go/src/github.com/grafana/beyla/examples/example-http-service/example-http-service.go
...

源码的整体架构

beyla源码中,发现进程的代码流程分为2个部分:

源码入口:

// beyla/pkg/internal/discover/finder.go
func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
    gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen))
    graph.RegisterStart(gb, WatcherProvider)
    graph.RegisterMiddle(gb, CriteriaMatcherProvider)
    ...
    graph.RegisterTerminal(gb, TraceAttacherProvider)
    pipeline, err := gb.Build(pf)
    ...
    go pipeline.Run()   // 启动执行
    return pf.DiscoveredTracers, pf.DeleteTracers, nil
}

WatchProvider

WatchProvider负责监听端口绑定的系统调用,然后检查当前系统的进程创建/删除。

WatchProvider的实例化代码如下,其中:

// beyla/pkg/internal/discover/watcher.go
func WatcherProvider(w Watcher) (node.StartFunc[[]Event[processPorts]], error) {
    acc := pollAccounter{
        ctx:               w.Ctx,
        cfg:               w.Cfg,
        interval:          w.Cfg.Discovery.PollInterval,
        pids:              map[PID]processPorts{},
        pidPorts:          map[pidPort]processPorts{},
        listProcesses:     fetchProcessPorts,
        executableReady:   executableReady,
        loadBPFWatcher:    loadBPFWatcher,
        fetchPorts:        true,  // must be true until we've activated the bpf watcher component
        bpfWatcherEnabled: false, // async set by listening on the bpfWatchEvents channel
        stateMux:          sync.Mutex{},
        findingCriteria:   FindingCriteria(w.Cfg),
    }
    if acc.interval == 0 {
        acc.interval = defaultPollInterval
    }
    return acc.Run, nil    // acc.Run开始工作
}

具体工作由pollAccounter.Run()启动执行:

// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) Run(out chan<- []Event[processPorts]) {
    ...
    bpfWatchEvents := make(chan watcher.Event, 100)
    // 加载ebpf程序
    if err := pa.loadBPFWatcher(pa.cfg, bpfWatchEvents); err != nil {
        log.Error("Unable to load eBPF watcher for process events", "error", err)
    }
    // 消费ebpf程序的端口绑定事件
    go pa.watchForProcessEvents(log, bpfWatchEvents)
    for {
        // 查询当前系统内的所有进程和端口
        procs, err := pa.listProcesses(pa.portFetchRequired())    // 参数=true
        if err != nil {
            log.Warn("can't get system processes", "error", err)
        } else {
            // 自上次轮训以来,本次轮训得到进程创建/删除事件;
            if events := pa.snapshot(procs); len(events) > 0 {
                log.Debug("new process watching events", "events", events)
                out <- events
            }
        }
        select {
        case <-pa.ctx.Done():
            log.Debug("context canceled. Exiting")
            return
        case <-time.After(pa.interval):     // 定期轮训,默认interval=5s
            // poll event starting again
        }
    }
}

ebpf监听端口绑定

ebpf程序:

// beyla/bpf/watch_helper.c
SEC("kprobe/security_socket_bind")
int kprobe_security_socket_bind(struct pt_regs *ctx) {
    struct sockaddr *addr = (struct sockaddr *)PT_REGS_PARM2(ctx);
    ...
    u16 port = get_sockaddr_port(addr);
    ...
    watch_info_t *trace = bpf_ringbuf_reserve(&watch_events, sizeof(watch_info_t), 0);
    if (trace) {
        trace->flags = WATCH_BIND;
        trace->payload = port; 
        bpf_dbg_printk("New port bound %d", trace->payload);
        bpf_ringbuf_submit(trace, 0);
    }
    return 0;
}

然后在golang程序中,读取ebpf的ringbuf,得到NewPort监听的事件:

// beyla/pkg/internal/ebpf/watcher/watcher.go
func (p *Watcher) processWatchEvent(record *ringbuf.Record) (request.Span, bool, error) {
    var flags uint64
    var event BPFWatchInfo
    err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &flags)
    ...
    if flags == 1 { // socket bind
        err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
        if err == nil {
            p.log.Debug("New port bind event", "port", event.Payload)
            p.events <- Event{Type: NewPort, Payload: uint32(event.Payload)}
        }
    }
    return request.Span{}, true, nil
}

上面p.events的消费代码:

// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) watchForProcessEvents(log *slog.Logger, events <-chan watcher.Event) {
    for e := range events {
        switch e.Type {
        case watcher.Ready:
            pa.bpfWatcherIsReady()
        case watcher.NewPort:
            port := int(e.Payload)
            if pa.cfg.Port.Matches(port) || pa.findingCriteria.PortOfInterest(port) {
                pa.refetchPorts()
            }
        default:
            log.Warn("Unknown ebpf process watch event", "type", e.Type)
        }
    }
}

pa.refetchPorts()仅设置了一个flag:pa.fetchPorts=true:

func (pa *pollAccounter) refetchPorts() {
   pa.stateMux.Lock()
   defer pa.stateMux.Unlock()
   pa.fetchPorts = true
}

检查进程的创建/删除

首先,查询进程和进程使用的ports:

// beyla/pkg/internal/discover/watcher.go
// 参数scanPorts=true
func fetchProcessPorts(scanPorts bool) (map[PID]processPorts, error) {
    processes := map[PID]processPorts{}
    pids, err := process.Pids()
    for _, pid := range pids {
        if !scanPorts { // 不查询ports
            processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: []uint32{}}
            continue
        }
        // 查询ports
        conns, err := net.ConnectionsPid("inet", pid)
        ...
        var openPorts []uint32
        // TODO: Cap the size of this array, leaking client ephemeral ports will cause this to grow very long
        for _, conn := range conns {
            openPorts = append(openPorts, conn.Laddr.Port)
        }
        processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: openPorts}
    }
    return processes, nil
}

然后,对比上次轮训的结果与本次的结果,生成进程创建/删除的events:

// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) snapshot(fetchedProcs map[PID]processPorts) []Event[processPorts] {
    var events []Event[processPorts]
    currentPidPorts := make(map[pidPort]processPorts, len(fetchedProcs))
    reportedProcs := map[PID]struct{}{}
    notReadyProcs := map[PID]struct{}{}
    // notify processes that are new, or already existed but have a new connection
    for pid, proc := range fetchedProcs {
        // if the process does not have open ports, we might still notify it
        // for example, if it's a client with ephemeral connections, which might be later matched by executable name
        if len(proc.openPorts) == 0 {
            ...
        } else {
            for _, port := range proc.openPorts {
                if pa.checkNewProcessConnectionNotification(proc, port, currentPidPorts, reportedProcs, notReadyProcs) {
                    events = append(events, Event[processPorts]{Type: EventCreated, Obj: proc})         // 进程创建,同时保存进程和端口
                    // skip checking new connections for that process
                    continue
                }
            }
        }
    }
    // notify processes that are removed
    for pid, proc := range pa.pids {
        if _, ok := fetchedProcs[pid]; !ok {
            events = append(events, Event[processPorts]{Type: EventDeleted, Obj: proc})     // 进程删除,同时保存进程和端口
        }
    }
    ....
    pa.pids = currentProcs
    pa.pidPorts = currentPidPorts
    return events
}

CriteriaMatchProvider

CriteriaMatchProvider负责检查过滤满足条件的进程。

该Pipeline的实例化代码如下:

// beyla/pkg/internal/discover/matcher.go
func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processPorts], []Event[ProcessMatch]], error) {
   m := &matcher{
      log:            slog.With("component", "discover.CriteriaMatcher"),
      criteria:       FindingCriteria(cm.Cfg),
      processHistory: map[PID]struct{}{},
   }
   return m.run, nil
}

具体检查和筛选工作,由matcher.run()完成,该函数内:

// beyla/pkg/internal/discover/matcher.go
func (m *matcher) run(in <-chan []Event[processPorts], out chan<- []Event[ProcessMatch]) {
    m.log.Debug("starting criteria matcher node")
    for i := range in {
        m.log.Debug("filtering processes", "len", len(i))
        o := m.filter(i)        // 执行筛选
        m.log.Debug("processes matching selection criteria", "len", len(o))
        out <- o
    }
}

而m.fiter()在筛选时:

// beyla/pkg/internal/discover/matcher.go
func (m *matcher) filter(events []Event[processPorts]) []Event[ProcessMatch] {
    var matches []Event[ProcessMatch]
    // 针对每一个event
    for _, ev := range events {
        ...
        proc, err := processInfo(ev.Obj)
        ...
        // 遍历m.criteria
        for i := range m.criteria {
            // 检查是否满足其中一个criteria
            if m.matchProcess(proc, &m.criteria[i]) {
                comm := proc.ExePath
                // match,找到一个符合条件的进程
                m.log.Debug("found process", "pid", proc.Pid, "comm", comm)
                matches = append(matches, Event[ProcessMatch]{
                    Type: EventCreated,
                    Obj:  ProcessMatch{Criteria: &m.criteria[i], Process: proc},
                })
                break
            }
        }
    }
    return matches
}

具体看下m.matchProcesses()中关于port的匹配逻辑:

// beyla/pkg/internal/discover/matcher.go
func (m *matcher) matchProcess(p *services.ProcessInfo, a *services.Attributes) bool {
    ...
    if a.OpenPorts.Len() > 0 {
        return m.matchByPort(p, a)    // 检查端口是否matcher
    }
    return true
}
func (m *matcher) matchByPort(p *services.ProcessInfo, a *services.Attributes) bool {
    for _, c := range p.OpenPorts {
        if a.OpenPorts.Matches(int(c)) {    // openPorts=BEYLA_OPEN_PORT参数
            return true
        }
    }
    return false
}

openPorts是PortEnum类型,一个range=[start,End],若仅指定一个,则start=指定值,end=0;

// beyla/pkg/internal/discover/services/criteria.go
type PortEnum struct {
   ranges []portRange
}
type portRange struct {
   start int
   // if end == 0, it means this entry is not a port range but a single port
   end int
}

其match逻辑如下:

// beyla/pkg/internal/discover/services/criteria.go
func (p *PortEnum) Matches(port int) bool {
    for _, pr := range p.ranges {
        if pr.end == 0 && pr.start == port ||
            pr.end != 0 && pr.start <= port && port <= pr.end {
            return true
        }
    }
    return false
}

以上就是以go为例探究beyla从环境变量BEYLA_OPEN_PORT发现进程原理的详细内容,更多关于go beyla BEYLA_OPEN_PORT进程的资料请关注脚本之家其它相关文章!

阅读全文