Skip to content

Commit

Permalink
pkg/udev: implement udev event reading/monitoring.
Browse files Browse the repository at this point in the history
Implement low-level udev raw reader, event reader and a
a monitor, with support for property-based event filter.
This should provide enough plumbing to implement memory
hotplug/unplug detection.

Signed-off-by: Krisztian Litkey <[email protected]>
  • Loading branch information
klihub committed Dec 12, 2024
1 parent a122a15 commit c965cfc
Show file tree
Hide file tree
Showing 3 changed files with 484 additions and 0 deletions.
140 changes: 140 additions & 0 deletions pkg/udev/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright The NRI Plugins Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package udev

// Notes:
// The implementation of Reader was inspired a lot by the existing
// but mostly unmaintained github.com/s-urbaniak/uevent golang package.

//
// Examples:
//
// Reading udev events using the udev.Reader:
//
// package main
//
// import (
// "github.com/containers/nri-plugins/pkg/udev"
// "sigs.k8s.io/yaml"
//
// logger "github.com/containers/nri-plugins/pkg/log"
// )
//
// var (
// log = logger.Get("udev")
// )
//
// func main() {
// r, err := udev.NewEventReader()
// if err != nil {
// log.Fatalf("failed to create udev event reader: %v", err)
// }
//
// events := make(chan *udev.Event, 64)
//
// go func() {
// for evt := range events {
// dump(evt)
// }
// }()
//
// for {
// e, err := r.Read()
// if err != nil {
// log.Fatalf("failed to read udev event: %v", err)
// }
// events <- e
// }
// }
//
// func dump(e *udev.Event) {
// dump, err := yaml.Marshal(e)
// if err != nil {
// log.Errorf("failed to marshal event: %v\n", err)
// return
// }
// log.InfoBlock("udev reader ", "%s", dump)
// }
//
// Reading udev events using the udev.Monitor:
//
// package main
//
// import (
// "os"
// "strings"
//
// "github.com/containers/nri-plugins/pkg/udev"
// "sigs.k8s.io/yaml"
//
// logger "github.com/containers/nri-plugins/pkg/log"
// )
//
// var (
// log = logger.Get("udev")
// )
//
// func main() {
// var (
// filters = parseFilters()
// events = make(chan *udev.Event, 64)
// )
//
// m, err := udev.NewMonitor(udev.WithFilters(filters...))
// if err != nil {
// log.Fatalf("failed to create udev event reader: %v", err)
// }
//
// m.Start(events)
//
// for evt := range events {
// dump(evt)
// }
// }
//
// func parseFilters() []map[string]string {
// var filters []map[string]string
//
// for _, arg := range os.Args[1:] {
// if !strings.Contains(arg, "=") {
// continue
// }
//
// filter := map[string]string{}
// for _, expr := range strings.Split(arg, ",") {
// kv := strings.SplitN(expr, "=", 2)
// if len(kv) != 2 {
// log.Fatalf("invalid filter expression %s (in %s)", expr, arg)
// }
// filter[strings.ToUpper(kv[0])] = kv[1]
// }
// if len(filter) > 0 {
// log.Info("using parsed filter: %v", filter)
// filters = append(filters, filter)
// }
// }
//
// return filters
// }
//
// func dump(e *udev.Event) {
// dump, err := yaml.Marshal(e)
// if err != nil {
// log.Errorf("failed to marshal event: %v\n", err)
// return
// }
// log.InfoBlock("monitor ", "%s", dump)
// }
//
161 changes: 161 additions & 0 deletions pkg/udev/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright The NRI Plugins Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package udev

import (
"fmt"
"path"
)

// MonitorOption is an opaque option which can be applied to a Monitor.
type MonitorOption func(*Monitor)

// WithFilters returns a MonitorOption for filtering events by properties.
// Properties within a map have AND semantics: the map matches an event if
// all key-value pairs match the event. Multiple maps have OR semantics:
// they match an event if at least one map matches the event. Events which
// are matched are passed through. Others are filtered out.
func WithFilters(filters ...map[string]string) MonitorOption {
return func(m *Monitor) {
m.filters = append(m.filters, filters...)
}
}

// WithGlobFilters returns a MonitorOption for filtering events by properties.
// Semantics are similar to WithFilters, but properties are matched using glob
// patterns instead of verbatim comparison.
func WithGlobFilters(globbers ...map[string]string) MonitorOption {
return func(m *Monitor) {
m.globbers = append(m.globbers, globbers...)
}
}

// Monitor monitors udev events.
type Monitor struct {
r *EventReader
filters []map[string]string
globbers []map[string]string
stopCh chan struct{}
}

// NewMonitor creates an udev monitor with the given options.
func NewMonitor(options ...MonitorOption) (*Monitor, error) {
r, err := NewEventReader()
if err != nil {
return nil, fmt.Errorf("failed to create udev monitor reader: %w", err)
}

m := &Monitor{
r: r,
stopCh: make(chan struct{}),
}

for _, o := range options {
o(m)
}

return m, nil
}

// Start starts event monitoring and delivery.
func (m *Monitor) Start(events chan *Event) {
if len(m.filters) == 0 && len(m.globbers) == 0 {
go m.reader(events)
} else {
filter := make(chan *Event, 64)
go m.filterer(filter, events)
go m.reader(filter)
}
}

// Stop stops event monitoring.
func (m *Monitor) Stop() error {
return m.r.Close()
}

func (m *Monitor) reader(events chan<- *Event) {
for {
evt, err := m.r.Read()
if err != nil {
log.Errorf("failed to read udev event: %v", err)
m.r.Close()
close(events)
return
}

events <- evt
}
}

func (m *Monitor) filterer(events <-chan *Event, filtered chan<- *Event) {
var stuck bool

for evt := range events {
if !m.filter(evt) {
continue
}

select {
case filtered <- evt:
stuck = false
default:
if !stuck {
log.Warnf("dropped udev %s %s event", evt.Subsystem, evt.Action)
stuck = true
}
}
}
}

func (m *Monitor) filter(evt *Event) bool {
if len(m.filters) == 0 && len(m.globbers) == 0 {
return true
}

for _, filter := range m.filters {
match := true
for k, v := range filter {
if evt.Properties[k] != v {
match = false
break
}
}
if match {
return true
}
}

for _, glob := range m.globbers {
match := true
for k, p := range glob {
m, err := path.Match(p, evt.Properties[k])
if err != nil {
log.Errorf("failed to match udev event property %q=%q by pattern %q: %v",
k, evt.Properties[k], p, err)
delete(glob, k)
continue
}
if !m {
match = false
break
}
}
if match {
return true
}
}

return false
}
Loading

0 comments on commit c965cfc

Please sign in to comment.