以go为例探究beyla从环境变量BEYLA_OPEN_PORT发现进程原理
作者:a朋
beyla源码中,关于BEYLA_OPEN_PORT的定义
// beyla/pkg/internal/pipe/config.go type Config struct { ... Port services.PortEnum `yaml:"open_port" env:"BEYLA_OPEN_PORT"` ... } type PortEnum struct { ranges []portRange } type portRange struct { start int // if end == 0, it means this entry is not a port range but a single port end int }
可以看出,BEYLA_OPEN_PORT是个环境变量,它对应的变量类型是ProtEnum,它是一个数值范围的集合。
这里仅以指定单个值为例,如BEYLA_OPEN_PORT=8080,此时start=8080,end=0。
启动golang程序并发现它
启动beyla/exmpales下的example-http-service进程:
- 该golang进程会监听8080端口;
# curl -OL https://raw.githubusercontent.com/grafana/beyla/main/examples/example-http-service/example-http-service.go # go run ./example-http-service.go
然后启动beyla,指定BEYLA_OPEN_PORT=8080,通过端口发现进程:
# BEYLA_PROMETHEUS_PORT=9400 BEYLA_OPEN_PORT=8080 BEYLA_LOG_LEVEL=DEBUG beyla
最后beyla的日志中,就可以发现成功发现了该进程:
...
time=2023-12-12T21:43:42.358-05:00 level=DEBUG msg="filtering processes" component=discover.CriteriaMatcher len=337
time=2023-12-12T21:43:42.435-05:00 level=DEBUG msg="found process" component=discover.CriteriaMatcher pid=612536 comm=~/go/src/github.com/grafana/beyla/examples/example-http-service/example-http-service.go
...
源码的整体架构
beyla源码中,发现进程的代码流程分为2个部分:
WatchProvier:负责监听端口绑定的系统调用,然后检查当前系统的进程创建/删除;
监听端口绑定:
- 通过ebpf完成的,监听kprobe/security_socket_bind;
- 若监听到kprobe调用,则设置fetchPorts=true,该flag意味着后面在查询进程信息的时候,同时查询进程的端口信息;
检查当前系统的进程创建/删除:
- 查询当前系统上所有进程及进程使用的端口信息;
- 缓存上次轮训的进程,然后对比本次轮训的结果,得到进程的创建/删除事件;
- 进程的创建/删除信息,保存在chan[] Event中传递给下一个pipeline;
CriteriaMatchProvider:负责检查过并滤满足条件的进程;
- 指定BEYLA_OPEN_PORT的话,就检查chan []Event中,是否有监听该端口的进程;
- 若有,则意味着发现了新进程;
- 然后由后面pipeline=TraceAttacherProvider去监控新发现的进程内的http/grpc相关的kprobe/uprobe调用;
上述两个流程以pipeline的形式组装起来,之间通过chan []Event进行数据连接;
- Chan []Event中保存了距上次轮训以来,新增的进程或删除的进程;
源码入口:
// beyla/pkg/internal/discover/finder.go func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) { gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen)) graph.RegisterStart(gb, WatcherProvider) graph.RegisterMiddle(gb, CriteriaMatcherProvider) ... graph.RegisterTerminal(gb, TraceAttacherProvider) pipeline, err := gb.Build(pf) ... go pipeline.Run() // 启动执行 return pf.DiscoveredTracers, pf.DeleteTracers, nil }
WatchProvider
WatchProvider负责监听端口绑定的系统调用,然后检查当前系统的进程创建/删除。
WatchProvider的实例化代码如下,其中:
- loadBPFWatcher:负责监听ebpf端口绑定的系统调用;
- fetchProcessPorts:负责查询当前系统的进程和监听端口情况;
// beyla/pkg/internal/discover/watcher.go func WatcherProvider(w Watcher) (node.StartFunc[[]Event[processPorts]], error) { acc := pollAccounter{ ctx: w.Ctx, cfg: w.Cfg, interval: w.Cfg.Discovery.PollInterval, pids: map[PID]processPorts{}, pidPorts: map[pidPort]processPorts{}, listProcesses: fetchProcessPorts, executableReady: executableReady, loadBPFWatcher: loadBPFWatcher, fetchPorts: true, // must be true until we've activated the bpf watcher component bpfWatcherEnabled: false, // async set by listening on the bpfWatchEvents channel stateMux: sync.Mutex{}, findingCriteria: FindingCriteria(w.Cfg), } if acc.interval == 0 { acc.interval = defaultPollInterval } return acc.Run, nil // acc.Run开始工作 }
具体工作由pollAccounter.Run()启动执行:
- 首先,加载ebpf程序,该ebpf程序会监听端口绑定的系统调用;
- 然后,消费ebpf端口绑定的事件,设置轮训processPorts的flag=true;
- 再后,通过listProcesses()查询当前系统内的所有进程和端口;
- 最后,通过snaphost(procs)获得:自上次轮训以来,本地轮训得到的进程创建/删除事件;
- 这些事件被放入chan []Event,由下一级的pipeline消费处理;
// beyla/pkg/internal/discover/watcher.go func (pa *pollAccounter) Run(out chan<- []Event[processPorts]) { ... bpfWatchEvents := make(chan watcher.Event, 100) // 加载ebpf程序 if err := pa.loadBPFWatcher(pa.cfg, bpfWatchEvents); err != nil { log.Error("Unable to load eBPF watcher for process events", "error", err) } // 消费ebpf程序的端口绑定事件 go pa.watchForProcessEvents(log, bpfWatchEvents) for { // 查询当前系统内的所有进程和端口 procs, err := pa.listProcesses(pa.portFetchRequired()) // 参数=true if err != nil { log.Warn("can't get system processes", "error", err) } else { // 自上次轮训以来,本次轮训得到进程创建/删除事件; if events := pa.snapshot(procs); len(events) > 0 { log.Debug("new process watching events", "events", events) out <- events } } select { case <-pa.ctx.Done(): log.Debug("context canceled. Exiting") return case <-time.After(pa.interval): // 定期轮训,默认interval=5s // poll event starting again } } }
ebpf监听端口绑定
ebpf程序:
- 监听系统调用kprobe/security_socket_bind;
// beyla/bpf/watch_helper.c SEC("kprobe/security_socket_bind") int kprobe_security_socket_bind(struct pt_regs *ctx) { struct sockaddr *addr = (struct sockaddr *)PT_REGS_PARM2(ctx); ... u16 port = get_sockaddr_port(addr); ... watch_info_t *trace = bpf_ringbuf_reserve(&watch_events, sizeof(watch_info_t), 0); if (trace) { trace->flags = WATCH_BIND; trace->payload = port; bpf_dbg_printk("New port bound %d", trace->payload); bpf_ringbuf_submit(trace, 0); } return 0; }
然后在golang程序中,读取ebpf的ringbuf,得到NewPort监听的事件:
// beyla/pkg/internal/ebpf/watcher/watcher.go func (p *Watcher) processWatchEvent(record *ringbuf.Record) (request.Span, bool, error) { var flags uint64 var event BPFWatchInfo err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &flags) ... if flags == 1 { // socket bind err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) if err == nil { p.log.Debug("New port bind event", "port", event.Payload) p.events <- Event{Type: NewPort, Payload: uint32(event.Payload)} } } return request.Span{}, true, nil }
上面p.events的消费代码:
- 若监听到目标端口的进程被创建,则执行pa.refetchPorts();
// beyla/pkg/internal/discover/watcher.go func (pa *pollAccounter) watchForProcessEvents(log *slog.Logger, events <-chan watcher.Event) { for e := range events { switch e.Type { case watcher.Ready: pa.bpfWatcherIsReady() case watcher.NewPort: port := int(e.Payload) if pa.cfg.Port.Matches(port) || pa.findingCriteria.PortOfInterest(port) { pa.refetchPorts() } default: log.Warn("Unknown ebpf process watch event", "type", e.Type) } } }
pa.refetchPorts()仅设置了一个flag:pa.fetchPorts=true:
- 该flag=true意味着后面listProcess的时候,需要同时查询进程使用的端口;
func (pa *pollAccounter) refetchPorts() { pa.stateMux.Lock() defer pa.stateMux.Unlock() pa.fetchPorts = true }
检查进程的创建/删除
首先,查询进程和进程使用的ports:
- 参数scanPorts=true,意味着查询进程的ports;
// beyla/pkg/internal/discover/watcher.go // 参数scanPorts=true func fetchProcessPorts(scanPorts bool) (map[PID]processPorts, error) { processes := map[PID]processPorts{} pids, err := process.Pids() for _, pid := range pids { if !scanPorts { // 不查询ports processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: []uint32{}} continue } // 查询ports conns, err := net.ConnectionsPid("inet", pid) ... var openPorts []uint32 // TODO: Cap the size of this array, leaking client ephemeral ports will cause this to grow very long for _, conn := range conns { openPorts = append(openPorts, conn.Laddr.Port) } processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: openPorts} } return processes, nil }
然后,对比上次轮训的结果与本次的结果,生成进程创建/删除的events:
- events中保存进程进程和端口信息;
- events被放入chan,然后给下一级的pipeline使用;
// beyla/pkg/internal/discover/watcher.go func (pa *pollAccounter) snapshot(fetchedProcs map[PID]processPorts) []Event[processPorts] { var events []Event[processPorts] currentPidPorts := make(map[pidPort]processPorts, len(fetchedProcs)) reportedProcs := map[PID]struct{}{} notReadyProcs := map[PID]struct{}{} // notify processes that are new, or already existed but have a new connection for pid, proc := range fetchedProcs { // if the process does not have open ports, we might still notify it // for example, if it's a client with ephemeral connections, which might be later matched by executable name if len(proc.openPorts) == 0 { ... } else { for _, port := range proc.openPorts { if pa.checkNewProcessConnectionNotification(proc, port, currentPidPorts, reportedProcs, notReadyProcs) { events = append(events, Event[processPorts]{Type: EventCreated, Obj: proc}) // 进程创建,同时保存进程和端口 // skip checking new connections for that process continue } } } } // notify processes that are removed for pid, proc := range pa.pids { if _, ok := fetchedProcs[pid]; !ok { events = append(events, Event[processPorts]{Type: EventDeleted, Obj: proc}) // 进程删除,同时保存进程和端口 } } .... pa.pids = currentProcs pa.pidPorts = currentPidPorts return events }
CriteriaMatchProvider
CriteriaMatchProvider负责检查过滤满足条件的进程。
该Pipeline的实例化代码如下:
- 其中criteria=筛选标准,从进程配置中生成,后面的筛选均通过与criteria进行比对完成;
// beyla/pkg/internal/discover/matcher.go func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processPorts], []Event[ProcessMatch]], error) { m := &matcher{ log: slog.With("component", "discover.CriteriaMatcher"), criteria: FindingCriteria(cm.Cfg), processHistory: map[PID]struct{}{}, } return m.run, nil }
具体检查和筛选工作,由matcher.run()完成,该函数内:
- 针对每一批的[]Event,由m.filter进行筛选;
- 筛选的结果最终保存到chan []Event,传递给下一级的Pipeline使用;
// beyla/pkg/internal/discover/matcher.go func (m *matcher) run(in <-chan []Event[processPorts], out chan<- []Event[ProcessMatch]) { m.log.Debug("starting criteria matcher node") for i := range in { m.log.Debug("filtering processes", "len", len(i)) o := m.filter(i) // 执行筛选 m.log.Debug("processes matching selection criteria", "len", len(o)) out <- o } }
而m.fiter()在筛选时:
- 针对每一个event,遍历criteria,检查event内的进程是否match criteria,若match,则意味着找到一个进程;
- 其中一个event内保存EventType(Create/Delete)和进程信息(包含端口),只需满足其中一个criteria即可;
// beyla/pkg/internal/discover/matcher.go func (m *matcher) filter(events []Event[processPorts]) []Event[ProcessMatch] { var matches []Event[ProcessMatch] // 针对每一个event for _, ev := range events { ... proc, err := processInfo(ev.Obj) ... // 遍历m.criteria for i := range m.criteria { // 检查是否满足其中一个criteria if m.matchProcess(proc, &m.criteria[i]) { comm := proc.ExePath // match,找到一个符合条件的进程 m.log.Debug("found process", "pid", proc.Pid, "comm", comm) matches = append(matches, Event[ProcessMatch]{ Type: EventCreated, Obj: ProcessMatch{Criteria: &m.criteria[i], Process: proc}, }) break } } } return matches }
具体看下m.matchProcesses()中关于port的匹配逻辑:
- 使用openPorts(即BEYLA_OPEN_PORT参数)与进程的port进行比对;
// beyla/pkg/internal/discover/matcher.go func (m *matcher) matchProcess(p *services.ProcessInfo, a *services.Attributes) bool { ... if a.OpenPorts.Len() > 0 { return m.matchByPort(p, a) // 检查端口是否matcher } return true } func (m *matcher) matchByPort(p *services.ProcessInfo, a *services.Attributes) bool { for _, c := range p.OpenPorts { if a.OpenPorts.Matches(int(c)) { // openPorts=BEYLA_OPEN_PORT参数 return true } } return false }
openPorts是PortEnum类型,一个range=[start,End],若仅指定一个,则start=指定值,end=0;
// beyla/pkg/internal/discover/services/criteria.go type PortEnum struct { ranges []portRange } type portRange struct { start int // if end == 0, it means this entry is not a port range but a single port end int }
其match逻辑如下:
- 由于指定BEYLA_OPEN_PORT=8080,则start=8080,end=0,该函数返回=true;
// beyla/pkg/internal/discover/services/criteria.go func (p *PortEnum) Matches(port int) bool { for _, pr := range p.ranges { if pr.end == 0 && pr.start == port || pr.end != 0 && pr.start <= port && port <= pr.end { return true } } return false }
以上就是以go为例探究beyla从环境变量BEYLA_OPEN_PORT发现进程原理的详细内容,更多关于go beyla BEYLA_OPEN_PORT进程的资料请关注脚本之家其它相关文章!