Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/udev: implement udev event reading/monitoring. #449

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions pkg/udev/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright The NRI Plugins Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package udev

// Notes:
// The implementation of Reader was inspired a lot by the existing
// but mostly unmaintained github.com/s-urbaniak/uevent golang package.

//
// Examples:
//
// Reading udev events using udev.Reader:
//
// package main
//
// import (
// "github.com/containers/nri-plugins/pkg/udev"
// "sigs.k8s.io/yaml"
//
// logger "github.com/containers/nri-plugins/pkg/log"
// )
//
// var (
// log = logger.Get("udev")
// )
//
// func main() {
// r, err := udev.NewEventReader()
// if err != nil {
// log.Fatalf("failed to create udev event reader: %v", err)
// }
//
// events := make(chan *udev.Event, 64)
//
// go func() {
// for evt := range events {
// dump(evt)
// }
// }()
//
// for {
// e, err := r.Read()
// if err != nil {
// log.Fatalf("failed to read udev event: %v", err)
// }
// events <- e
// }
// }
//
// func dump(e *udev.Event) {
// dump, err := yaml.Marshal(e)
// if err != nil {
// log.Errorf("failed to marshal event: %v\n", err)
// return
// }
// log.InfoBlock("udev reader ", "%s", dump)
// }
//
// Filtered reading of udev events using udev.Monitor:
//
// package main
//
// import (
// "os"
// "strings"
//
// "github.com/containers/nri-plugins/pkg/udev"
// "sigs.k8s.io/yaml"
//
// logger "github.com/containers/nri-plugins/pkg/log"
// )
//
// var (
// log = logger.Get("udev")
// )
//
// func main() {
// var (
// filters = parseFilters()
// events = make(chan *udev.Event, 64)
// )
//
// m, err := udev.NewMonitor(udev.WithFilters(filters...))
// if err != nil {
// log.Fatalf("failed to create udev event reader: %v", err)
// }
//
// m.Start(events)
//
// for evt := range events {
// dump(evt)
// }
// }
//
// func parseFilters() []map[string]string {
// var filters []map[string]string
//
// for _, arg := range os.Args[1:] {
// if !strings.Contains(arg, "=") {
// continue
// }
//
// filter := map[string]string{}
// for _, expr := range strings.Split(arg, ",") {
// kv := strings.SplitN(expr, "=", 2)
// if len(kv) != 2 {
// log.Fatalf("invalid filter expression %s (in %s)", expr, arg)
// }
// filter[strings.ToUpper(kv[0])] = kv[1]
// }
// if len(filter) > 0 {
// log.Info("using parsed filter: %v", filter)
// filters = append(filters, filter)
// }
// }
//
// return filters
// }
//
// func dump(e *udev.Event) {
// dump, err := yaml.Marshal(e)
// if err != nil {
// log.Errorf("failed to marshal event: %v\n", err)
// return
// }
// log.InfoBlock("monitor ", "%s", dump)
// }
//
161 changes: 161 additions & 0 deletions pkg/udev/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright The NRI Plugins Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package udev

import (
"fmt"
"path"
)

// MonitorOption is an opaque option which can be applied to a Monitor.
type MonitorOption func(*Monitor)

// WithFilters returns a MonitorOption for filtering events by properties.
// Properties within a map have AND semantics: the map matches an event if
// all key-value pairs match the event. Multiple maps have OR semantics:
// they match an event if at least one map matches the event. Events which
// are matched are passed through. Others are filtered out.
func WithFilters(filters ...map[string]string) MonitorOption {
return func(m *Monitor) {
m.filters = append(m.filters, filters...)
}
}

// WithGlobFilters returns a MonitorOption for filtering events by properties.
// Semantics are similar to WithFilters, but properties are matched using glob
// patterns instead of verbatim comparison.
func WithGlobFilters(globbers ...map[string]string) MonitorOption {
return func(m *Monitor) {
m.globbers = append(m.globbers, globbers...)
}
}

// Monitor monitors udev events.
type Monitor struct {
r *EventReader
filters []map[string]string
globbers []map[string]string
stopCh chan struct{}
}

// NewMonitor creates an udev monitor with the given options.
func NewMonitor(options ...MonitorOption) (*Monitor, error) {
r, err := NewEventReader()
if err != nil {
return nil, fmt.Errorf("failed to create udev monitor reader: %w", err)
}

m := &Monitor{
r: r,
stopCh: make(chan struct{}),
}

for _, o := range options {
o(m)
}

return m, nil
}

// Start starts event monitoring and delivery.
func (m *Monitor) Start(events chan *Event) {
if len(m.filters) == 0 && len(m.globbers) == 0 {
go m.reader(events)
} else {
filter := make(chan *Event, 64)
go m.filterer(filter, events)
go m.reader(filter)
}
}

// Stop stops event monitoring.
func (m *Monitor) Stop() error {
return m.r.Close()
}

func (m *Monitor) reader(events chan<- *Event) {
for {
evt, err := m.r.Read()
if err != nil {
log.Errorf("failed to read udev event: %v", err)
m.r.Close()
close(events)
return
}

events <- evt
}
}

func (m *Monitor) filterer(events <-chan *Event, filtered chan<- *Event) {
var stuck bool

for evt := range events {
if !m.filter(evt) {
continue
}

select {
case filtered <- evt:
stuck = false
default:
if !stuck {
log.Warnf("dropped udev %s %s event", evt.Subsystem, evt.Action)
stuck = true
}
}
}
}

func (m *Monitor) filter(evt *Event) bool {
if len(m.filters) == 0 && len(m.globbers) == 0 {
return true
}

for _, filter := range m.filters {
match := true
for k, v := range filter {
if evt.Properties[k] != v {
match = false
break
}
}
if match {
return true
}
}

for _, glob := range m.globbers {
match := true
for k, p := range glob {
m, err := path.Match(p, evt.Properties[k])
if err != nil {
log.Errorf("failed to match udev event property %q=%q by pattern %q: %v",
k, evt.Properties[k], p, err)
delete(glob, k)
continue
}
if !m {
match = false
break
}
}
if match {
return true
}
}

return false
}
Loading
Loading