From 5273de1018e7d275f762dbf0f6018c9a7eab88fd Mon Sep 17 00:00:00 2001 From: Aneesh Puttur Date: Tue, 20 Jun 2023 12:42:04 -0400 Subject: [PATCH] Add monitoring for all process and setting up GM State Signed-off-by: Aneesh Puttur --- pkg/daemon/config/config.go | 14 ++ pkg/daemon/daemon/config.go | 2 +- pkg/daemon/daemon/daemon.go | 127 ++++++++--- pkg/daemon/daemon/gpsd.go | 122 +++++++++-- pkg/daemon/daemon/gpspipe.go | 16 +- pkg/daemon/daemon/metrics.go | 9 +- pkg/daemon/daemon/process.go | 10 +- pkg/daemon/dpll/dpll.go | 269 +++++++++++++++++++++++ pkg/daemon/dpll/dpll_test.go | 28 +++ pkg/daemon/ublox/types.go | 32 ++- pkg/daemon/ublox/ublox.go | 53 ++++- pkg/event/event.go | 406 +++++++++++++++++++++++++++++------ pkg/event/event_test.go | 2 +- pkg/event/stats.go | 23 ++ 14 files changed, 979 insertions(+), 134 deletions(-) create mode 100644 pkg/daemon/dpll/dpll.go create mode 100644 pkg/daemon/dpll/dpll_test.go create mode 100644 pkg/event/stats.go diff --git a/pkg/daemon/config/config.go b/pkg/daemon/config/config.go index 5dc737d1c..b901e02d0 100644 --- a/pkg/daemon/config/config.go +++ b/pkg/daemon/config/config.go @@ -41,3 +41,17 @@ func GetKubeConfig() (*rest.Config, error) { return nil, fmt.Errorf("Could not locate a kubeconfig") } + +type ProcessConfig struct { + ClockType event.ClockType + ConfigName string + CloseCh chan bool + EventChannel chan<- event.EventChannel + GMThreshold Threshold + InitialPTPState event.PTPState +} +type Threshold struct { + Max int64 + Min int64 + HoldOverTimeout int64 +} diff --git a/pkg/daemon/daemon/config.go b/pkg/daemon/daemon/config.go index d5dc7d307..e1dc719db 100644 --- a/pkg/daemon/daemon/config.go +++ b/pkg/daemon/daemon/config.go @@ -158,7 +158,7 @@ func (conf *ptp4lConf) renderPtp4lConf() (string, string) { for _, section := range conf.sections { configOut = fmt.Sprintf("%s\n%s", configOut, section.sectionName) - if section.sectionName != "[global]" { + if section.sectionName != "[global]" && section.sectionName != "[nmea]" { iface := section.sectionName iface = strings.ReplaceAll(iface, "[", "") iface = strings.ReplaceAll(iface, "]", "") diff --git a/pkg/daemon/daemon/daemon.go b/pkg/daemon/daemon/daemon.go index 435117ddc..3a6cab863 100644 --- a/pkg/daemon/daemon/daemon.go +++ b/pkg/daemon/daemon/daemon.go @@ -49,7 +49,8 @@ type ptpProcess struct { depProcess []process // this could gpsd and other process which needs to be stopped if the parent process is stopped nodeProfile *ptpv1.PtpProfile parentClockClass float64 - pmcCheck bool + pmcCheck bool + ptpClockThreshold *ptpv1.PtpClockThreshold } func (p *ptpProcess) Stopped() bool { @@ -136,7 +137,7 @@ func (dn *Daemon) Run() { if p != nil { for _, d := range p.depProcess { if d != nil { - d.cmdStop() + d.CmdStop() d = nil } } @@ -174,7 +175,7 @@ func (dn *Daemon) applyNodePTPProfiles() error { if p.depProcess != nil { for _, d := range p.depProcess { if d != nil { - d.cmdStop() + d.CmdStop() d = nil } } @@ -216,15 +217,28 @@ func (dn *Daemon) applyNodePTPProfiles() error { for _, d := range p.depProcess { if d != nil { time.Sleep(3 * time.Second) - go d.cmdRun(false) + go d.CmdRun(false) time.Sleep(3 * time.Second) dn.pluginManager.AfterRunPTPCommand(p.nodeProfile, d.Name()) - d.monitorEvent(clockType, p.configName, p.exitCh, dn.processManager.eventChannel) + //TODO: Maybe Move DPLL start and stop as part of pluign + d.MonitorProcess(config.ProcessConfig{ + ClockType: clockType, + ConfigName: p.configName, + CloseCh: p.exitCh, + EventChannel: dn.processManager.eventChannel, + GMThreshold: config.Threshold{ + Max: p.ptpClockThreshold.MaxOffsetThreshold, + Min: p.ptpClockThreshold.MinOffsetThreshold, + HoldOverTimeout: p.ptpClockThreshold.HoldOverTimeout, + }, + InitialPTPState: event.PTP_FREERUN, + }) } } } time.Sleep(1 * time.Second) go p.cmdRun() + p.eventCh = dn.processManager.eventChannel dn.pluginManager.AfterRunPTPCommand(p.nodeProfile, p.name) } } @@ -366,33 +380,40 @@ func (dn *Daemon) applyNodePtpProfile(runID int, nodeProfile *ptpv1.PtpProfile) ifacesList := strings.Split(ifaces, ",") dprocess := ptpProcess{ - name: p, - ifaces: ifacesList, - ptp4lConfigPath: configPath, - ptp4lSocketPath: socketPath, - configName: configFile, - messageTag: messageTag, - exitCh: make(chan bool), - stopped: false, - logFilterRegex: getLogFilterRegex(nodeProfile), - cmd: cmd, - depProcess: []process{}, - nodeProfile: nodeProfile, + name: p, + ifaces: ifacesList, + ptp4lConfigPath: configPath, + ptp4lSocketPath: socketPath, + configName: configFile, + messageTag: messageTag, + exitCh: make(chan bool), + stopped: false, + logFilterRegex: getLogFilterRegex(nodeProfile), + cmd: cmd, + depProcess: []process{}, + nodeProfile: nodeProfile, + ptpClockThreshold: getPTPThreshold(nodeProfile), } //TODO HARDWARE PLUGIN for e810 if pProcess == ts2phcProcessName { //& if the x plugin is enabled + //TODO: move this to plugin or call it from hwplugin or leave it here and remove Hardcoded + gmInterface := "" + if len(ifacesList) > 0 { + gmInterface = ifacesList[0] + } if e := mkFifo(); e != nil { glog.Errorf("Error creating named pipe, GNSS monitoring will not work as expected %s", e.Error()) } gpsDaemon := &gpsd{ - name: GPSD_PROCESSNAME, - execMutex: sync.Mutex{}, - cmd: nil, - serialPort: GPSD_SERIALPORT, - exitCh: make(chan bool), - stopped: false, + name: GPSD_PROCESSNAME, + execMutex: sync.Mutex{}, + cmd: nil, + serialPort: GPSD_SERIALPORT, + exitCh: make(chan bool), + gmInterface: gmInterface, + stopped: false, } - gpsDaemon.cmdInit() + gpsDaemon.CmdInit() dprocess.depProcess = append(dprocess.depProcess, gpsDaemon) // init gpspipe @@ -404,15 +425,20 @@ func (dn *Daemon) applyNodePtpProfile(runID int, nodeProfile *ptpv1.PtpProfile) exitCh: make(chan bool), stopped: false, } - gpsPipeDaemon.cmdInit() + gpsPipeDaemon.CmdInit() dprocess.depProcess = append(dprocess.depProcess, gpsPipeDaemon) + // init dpll + // TODO: Try to inject DPLL depProcess via plugin ? + dpllDaemon := dpll.NewDpll(dpll.LocalMaxHoldoverOffSet, dpll.LocalHoldoverTimeout, dpll.MaxInSpecOffset, + gmInterface, []event.EventSource{event.GNSS}) + dprocess.depProcess = append(dprocess.depProcess, dpllDaemon) + dn.processManager.process = append(dn.processManager.process, &dprocess) } err = os.WriteFile(configPath, []byte(configOutput), 0644) if err != nil { printNodeProfile(nodeProfile) return fmt.Errorf("failed to write the configuration file named %s: %v", configPath, err) } - dn.processManager.process = append(dn.processManager.process, &dprocess) } return nil @@ -506,7 +532,7 @@ func (p *ptpProcess) cmdRun() { p.cmd.Stderr = os.Stderr cmdReader, err := p.cmd.StdoutPipe() if err != nil { - glog.Errorf("cmdRun() error creating StdoutPipe for %s: %v", p.name, err) + glog.Errorf("CmdRun() error creating StdoutPipe for %s: %v", p.name, err) break } scanner := bufio.NewScanner(cmdReader) @@ -522,7 +548,28 @@ func (p *ptpProcess) cmdRun() { if regexErr != nil || !logFilterRegex.MatchString(output) { fmt.Printf("%s\n", output) } - extractMetrics(p.messageTag, p.name, p.ifaces, output) + source, ts2phcsOffset, state, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output) + var ts2phcState event.PTPState + if state == FREERUN { + ts2phcState = event.PTP_FREERUN + } else if state == LOCKED { + ts2phcState = event.PTP_LOCKED + } + if source == ts2phcProcessName { + p.eventCh <- event.EventChannel{ + ProcessName: event.TS2PHC, + State: ts2phcState, + CfgName: p.configName, + IFace: iface, + Values: map[event.ValueType]int64{ + event.OFFSET: int64(ts2phcsOffset), + }, + ClockType: "GM", //TODO: add actual defined + Time: time.Now().Unix(), + WriteToLog: true, + Reset: false, + } + } } done <- struct{}{} }() @@ -530,13 +577,13 @@ func (p *ptpProcess) cmdRun() { if !p.Stopped() { err = p.cmd.Start() // this is asynchronous call, if err != nil { - glog.Errorf("cmdRun() error starting %s: %v", p.name, err) + glog.Errorf("CmdRun() error starting %s: %v", p.name, err) } } <-done // goroutine is done err = p.cmd.Wait() if err != nil { - glog.Errorf("cmdRun() error waiting for %s: %v", p.name, err) + glog.Errorf("CmdRun() error waiting for %s: %v", p.name, err) } processStatus(p.name, p.messageTag, PtpProcessDown) @@ -577,3 +624,23 @@ func (p *ptpProcess) cmdStop() { <-p.exitCh glog.Infof("Process %d terminated", p.cmd.Process.Pid) } + +func getPTPThreshold(nodeProfile *ptpv1.PtpProfile) *ptpv1.PtpClockThreshold { + if nodeProfile.PtpClockThreshold != nil { + return &ptpv1.PtpClockThreshold{ + HoldOverTimeout: nodeProfile.PtpClockThreshold.HoldOverTimeout, + MaxOffsetThreshold: nodeProfile.PtpClockThreshold.MaxOffsetThreshold, + MinOffsetThreshold: nodeProfile.PtpClockThreshold.MinOffsetThreshold, + } + } else { + return &ptpv1.PtpClockThreshold{ + HoldOverTimeout: 5, + MaxOffsetThreshold: 100, + MinOffsetThreshold: 100, + } + } +} + +func (p *ptpProcess) MonitorEvent(offset float64, clockState string) { + +} diff --git a/pkg/daemon/daemon/gpsd.go b/pkg/daemon/daemon/gpsd.go index f29269dba..83a36af1b 100644 --- a/pkg/daemon/daemon/gpsd.go +++ b/pkg/daemon/daemon/gpsd.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/exec" + "strconv" "strings" "sync" "syscall" @@ -21,16 +22,20 @@ const ( ) type gpsd struct { - name string - execMutex sync.Mutex - cmd *exec.Cmd - serialPort string - exitCh chan bool - stopped bool + name string + execMutex sync.Mutex + cmd *exec.Cmd + serialPort string + exitCh chan bool + stopped bool + state event.PTPState + offset int64 + processConfig config.ProcessConfig + gmInterface string } -func (g *gpsd) monitorEvent(clockType event.ClockType, cfgName string, chClose chan bool, chEventChannel chan<- event.EventChannel) { - go gnss.MonitorGNSSEvents(clockType, cfgName, "E810", chClose, chEventChannel) +func (g *gpsd) MonitorProcess(p config.ProcessConfig) { + go g.monitorGNSSEvents(p, "E810") } func (g *gpsd) Name() string { @@ -53,14 +58,13 @@ func (g *gpsd) Stopped() bool { return me } -func (g *gpsd) cmdStop() { +func (g *gpsd) CmdStop() { glog.Infof("Stopping %s...", g.name) if g.cmd == nil { return } g.setStopped(true) - if g.cmd.Process != nil { glog.Infof("Sending TERM to PID: %d", g.cmd.Process.Pid) g.cmd.Process.Signal(syscall.SIGTERM) @@ -71,7 +75,7 @@ func (g *gpsd) cmdStop() { } // ubxtool -w 5 -v 1 -p MON-VER -P 29.20 -func (g *gpsd) cmdInit() { +func (g *gpsd) CmdInit() { if g.name == "" { g.name = "gpsd" } @@ -81,7 +85,7 @@ func (g *gpsd) cmdInit() { } -func (g *gpsd) cmdRun(stdoutToSocket bool) { +func (g *gpsd) CmdRun(stdoutToSocket bool) { done := make(chan struct{}) // Done setting up logging. Go ahead and wait for process defer func() { g.exitCh <- true @@ -93,7 +97,7 @@ func (g *gpsd) cmdRun(stdoutToSocket bool) { cmdReader, err := g.cmd.StdoutPipe() if err != nil { - glog.Errorf("cmdRun() error creating StdoutPipe for %s: %v", g.Name(), err) + glog.Errorf("CmdRun() error creating StdoutPipe for %s: %v", g.Name(), err) break } if !stdoutToSocket { @@ -101,8 +105,8 @@ func (g *gpsd) cmdRun(stdoutToSocket bool) { go func() { for scanner.Scan() { //TODO: suppress logs for - output := scanner.Text() - fmt.Printf("%s\n", output) + _ = scanner.Text() + //fmt.Printf("%s\n", output) } done <- struct{}{} }() @@ -112,13 +116,97 @@ func (g *gpsd) cmdRun(stdoutToSocket bool) { time.Sleep(1 * time.Second) err = g.cmd.Start() // this is asynchronous call, if err != nil { - glog.Errorf("cmdRun() error starting %s: %v", g.Name(), err) + glog.Errorf("CmdRun() error starting %s: %v", g.Name(), err) } } <-done // goroutine is done err = g.cmd.Wait() if err != nil { - glog.Errorf("cmdRun() error waiting for %s: %v", g.Name(), err) + glog.Errorf("CmdRun() error waiting for %s: %v", g.Name(), err) } } } + +// MonitorGNSSEvents ... monitor gnss events +func (g *gpsd) monitorGNSSEvents(processCfg config.ProcessConfig, pluginName string) error { + //done := make(chan struct{}) // Done setting up logging. Go ahead and wait for process + // var currentNavStatus int64 + g.processConfig = processCfg + var err error + //currentNavStatus = 0 + glog.Infof("Starting GNSS Monitoring for plugin %s...", pluginName) + + var ublx *ublox.UBlox + g.state = event.PTP_FREERUN +retry: + if ublx, err = ublox.NewUblox(); err != nil { + glog.Errorf("failed to initialize GNSS monitoring via ublox %s", err) + time.Sleep(10 * time.Second) + goto retry + } else { + //TODO: monitor on 1PPS events trigger + ticker := time.NewTicker(1 * time.Second) + var lastState int64 + var lastOffset int64 + for { + select { + case <-ticker.C: + // do stuff + nStatus, errs := ublx.NavStatus() + offsetS, err2 := ublx.GetNavOffset() + if errs == nil && err2 == nil { + //calculate PTP states + g.offset, _ = strconv.ParseInt(offsetS, 10, 64) + if nStatus > 3 && g.isOffsetInRange() { + g.state = event.PTP_LOCKED + } else { + g.state = event.PTP_FREERUN + } + if lastState != nStatus || lastOffset != g.offset { + lastState = nStatus + lastOffset = g.offset + processCfg.EventChannel <- event.EventChannel{ + ProcessName: event.GNSS, + State: g.state, + CfgName: processCfg.ConfigName, + IFace: g.gmInterface, + Values: map[event.ValueType]int64{ + event.GPS_STATUS: nStatus, + event.OFFSET: g.offset, + }, + ClockType: processCfg.ClockType, + Time: time.Now().Unix(), + WriteToLog: true, + Reset: false, + } + } + + } else { + if errs != nil { + glog.Errorf("error calling ublox %s", errs) + } + if err2 != nil { + glog.Errorf("error calling ublox %s", err2) + } + } + case <-processCfg.CloseCh: + processCfg.EventChannel <- event.EventChannel{ + ProcessName: event.GNSS, + CfgName: processCfg.ConfigName, + ClockType: processCfg.ClockType, + Time: time.Now().Unix(), + Reset: true, + } + ticker.Stop() + return nil + } + } + } +} + +func (d *gpsd) isOffsetInRange() bool { + if d.offset < d.processConfig.GMThreshold.Max && d.offset > d.processConfig.GMThreshold.Min { + return true + } + return false +} diff --git a/pkg/daemon/daemon/gpspipe.go b/pkg/daemon/daemon/gpspipe.go index cbb43c58f..227341bbe 100644 --- a/pkg/daemon/daemon/gpspipe.go +++ b/pkg/daemon/daemon/gpspipe.go @@ -3,6 +3,7 @@ package daemon import ( "context" "fmt" + "github.com/openshift/linuxptp-daemon/pkg/config" "io" "os" "os/exec" @@ -10,7 +11,6 @@ import ( "syscall" "github.com/golang/glog" - "github.com/openshift/linuxptp-daemon/pkg/event" ) const ( @@ -50,7 +50,7 @@ func (gp *gpspipe) Stopped() bool { return me } -func (gp *gpspipe) cmdStop() { +func (gp *gpspipe) CmdStop() { glog.Infof("Stopping %s...", gp.name) if gp.cmd == nil { return @@ -62,13 +62,17 @@ func (gp *gpspipe) cmdStop() { glog.Infof("Sending TERM to PID: %d", gp.cmd.Process.Pid) gp.cmd.Process.Signal(syscall.SIGTERM) } - + // Clean up (delete) the named pipe + err := os.Remove(GPSPIPE_SERIALPORT) + if err != nil { + glog.Errorf("Failed to delete named pipe: %s", GPSD_SERIALPORT) + } <-gp.exitCh glog.Infof("Process %d terminated", gp.cmd.Process.Pid) } // ubxtool -w 5 -v 1 -p MON-VER -P 29.20 -func (gp *gpspipe) cmdInit() { +func (gp *gpspipe) CmdInit() { if gp.name == "" { gp.name = GPSPIPE_PROCESSNAME } @@ -80,7 +84,7 @@ func (gp *gpspipe) cmdInit() { gp.cmd = exec.Command("/usr/bin/bash", "-c", fmt.Sprintf("gpspipe -v -d -r -l -o %s ", gp.SerialPort())) } -func (gp *gpspipe) cmdRun(stdoutToSocket bool) { +func (gp *gpspipe) CmdRun(stdoutToSocket bool) { glog.Infof("running process %s", gp.name) stdout, err := gp.cmd.Output() if err != nil { @@ -111,7 +115,7 @@ func mkFifo() error { } return nil } -func (gp *gpspipe) monitorEvent(clockType event.ClockType, cfgName string, chClose chan bool, chEventChannel chan<- event.EventChannel) { +func (gp *gpspipe) MonitorProcess(config config.ProcessConfig) { //TODO implement me glog.Infof("monitoring for gpspipe not implemented") } diff --git a/pkg/daemon/daemon/metrics.go b/pkg/daemon/daemon/metrics.go index 1e2310e1a..bded3f006 100644 --- a/pkg/daemon/daemon/metrics.go +++ b/pkg/daemon/daemon/metrics.go @@ -107,7 +107,7 @@ var ( Help: "0 = FREERUN, 1 = LOCKED, 2 = HOLDOVER", }, []string{"process", "node", "iface"}) - // Threshold metrics to show current ptp threshold + // Threshold metrics to show current ptp GMThreshold InterfaceRole = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: PTPNamespace, @@ -174,7 +174,7 @@ func updatePTPMetrics(from, process, iface string, ptpOffset, maxPtpOffset, freq } // extractMetrics ... -func extractMetrics(messageTag string, processName string, ifaces []string, output string) { +func extractMetrics(messageTag string, processName string, ifaces []string, output string) (source string, offset float64, state string, iface string) { configName := strings.Replace(strings.Replace(messageTag, "]", "", 1), "[", "", 1) if strings.Contains(output, " max ") { ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay := extractSummaryMetrics(configName, processName, output) @@ -204,6 +204,10 @@ func extractMetrics(messageTag string, processName string, ifaces []string, outp updatePTPMetrics(offsetSource, processName, ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay) updateClockStateMetrics(processName, ifaceName, clockstate) } + source = processName + offset = ptpOffset + state = clockstate + iface = ifaceName } if processName == ptp4lProcessName { if portId, role := extractPTP4lEventState(output); portId > 0 { @@ -225,6 +229,7 @@ func extractMetrics(messageTag string, processName string, ifaces []string, outp } } } + return } func extractSummaryMetrics(configName, processName, output string) (iface string, ptpOffset, maxPtpOffset, frequencyAdjustment, delay float64) { diff --git a/pkg/daemon/daemon/process.go b/pkg/daemon/daemon/process.go index fbd5caeb8..e3e37f55b 100644 --- a/pkg/daemon/daemon/process.go +++ b/pkg/daemon/daemon/process.go @@ -1,12 +1,12 @@ package daemon -import "github.com/openshift/linuxptp-daemon/pkg/event" +import "github.com/openshift/linuxptp-daemon/pkg/config" type process interface { Name() string Stopped() bool - cmdStop() - cmdInit() - cmdRun(stdToSocket bool) - monitorEvent(clockType event.ClockType, configName string, chClose chan bool, chEventChannel chan<- event.EventChannel) + CmdStop() + CmdInit() + CmdRun(stdToSocket bool) + MonitorProcess(p config.ProcessConfig) } diff --git a/pkg/daemon/dpll/dpll.go b/pkg/daemon/dpll/dpll.go new file mode 100644 index 000000000..6dca4fb01 --- /dev/null +++ b/pkg/daemon/dpll/dpll.go @@ -0,0 +1,269 @@ +package dpll + +import ( + "encoding/binary" + "fmt" + "math" + "os" + "time" + + "github.com/golang/glog" + "github.com/openshift/linuxptp-daemon/pkg/config" + "github.com/openshift/linuxptp-daemon/pkg/event" +) + +const ( + DPLL_UNKNOW = -1 + DPLL_INVALID = 0 + DPLL_FREERUN = 1 + DPLL_LOCKED = 2 + DPLL_LOCKED_HO_ACQ = 3 + DPLL_HOLOVER = 4 + + LocalMaxHoldoverOffSet = 1500 //ns + LocalHoldoverTimeout = 14400 //secs + MaxInSpecOffset = 100 //ns +) + +type DpllConfig struct { + LocalMaxHoldoverOffSet int64 + LocalHoldoverTimeout int64 + MaxInSpecOffset int64 + iface string + name string + slope float64 + timer int64 //secs + inSpec bool + offset int64 + state event.PTPState + onHoldover bool + sourceLost bool + processConfig config.ProcessConfig + dependingState []event.EventSource +} + +func (d *DpllConfig) Name() string { + //TODO implement me + return "dpll" +} + +func (d *DpllConfig) Stopped() bool { + //TODO implement me + panic("implement me") +} + +func (d *DpllConfig) CmdStop() { + + glog.Infof("Process %s terminated", d.Name()) +} + +func (d *DpllConfig) CmdInit() { + //TODO implement me + glog.Infof("cmdInit not implemented %s", d.Name()) +} + +func (d *DpllConfig) CmdRun(stdToSocket bool) { + //TODO implement me + glog.Infof("cmdRun not implemented %s", d.Name()) +} + +func NewDpll(localMaxHoldoverOffSet, localHoldoverTimeout, maxInSpecOffset int64, + iface string, dependingState []event.EventSource) *DpllConfig { + d := &DpllConfig{ + LocalMaxHoldoverOffSet: localMaxHoldoverOffSet, + LocalHoldoverTimeout: localHoldoverTimeout, + MaxInSpecOffset: maxInSpecOffset, + slope: func() float64 { + return float64((localMaxHoldoverOffSet / localHoldoverTimeout) * 1000) + }(), + timer: 0, + offset: 0, + state: event.PTP_FREERUN, + iface: iface, + onHoldover: false, + sourceLost: false, + dependingState: dependingState, + } + d.timer = int64(float64(d.MaxInSpecOffset) / d.slope) + return d +} +func (d *DpllConfig) MonitorProcess(processCfg config.ProcessConfig) { + go d.MonitorDpll(processCfg) +} + +// MonitorDpll ... monitor dpll events +func (d *DpllConfig) MonitorDpll(processCfg config.ProcessConfig) { + ticker := time.NewTicker(1 * time.Second) + dpll_state := event.PTP_FREERUN + var closeCh chan bool + // determine dpll state + responseChannel := make(chan event.PTPState) + var responseState event.PTPState + d.inSpec = true + for { + select { + case <-ticker.C: + // monitor DPLL + //TODO: netlink to monitor DPLL start here + phase_status, frequency_status, phase_offset := d.sysfs(d.iface) + // check GPS status data lost ? + // send event + lowestState := event.PTP_UNKNOWN + var dependingProcessState []event.PTPState + for _, stateSource := range d.dependingState { + event.GetPTPStateRequest(event.StatusRequest{ + Source: stateSource, + CfgName: processCfg.ConfigName, + ResponseChannel: responseChannel, + }) + select { + case responseState = <-responseChannel: + case <-time.After(1 * time.Second): + responseState = event.PTP_UNKNOWN + } + dependingProcessState = append(dependingProcessState, responseState) + } + + for i, state := range dependingProcessState { + if i == 0 || state < lowestState { + lowestState = state + } + } + + // check dpll status + if lowestState == event.PTP_LOCKED { + d.sourceLost = false + } else { + d.sourceLost = true + } + // calculate dpll status + dpllStatus := d.getState(phase_status, frequency_status) + switch dpllStatus { + case DPLL_FREERUN, DPLL_INVALID, DPLL_UNKNOW: + d.inSpec = true + if d.onHoldover { + closeCh <- true + } + d.state = event.PTP_FREERUN + case DPLL_LOCKED: + d.inSpec = true + if !d.sourceLost && d.isOffsetInRange() { + d.state = event.PTP_LOCKED + } else { + d.state = event.PTP_FREERUN + } + case DPLL_LOCKED_HO_ACQ, DPLL_HOLOVER: + if !d.sourceLost && d.isOffsetInRange() { + d.inSpec = true + d.state = event.PTP_LOCKED + if d.onHoldover { + d.inSpec = true + closeCh <- true + } + } else if d.sourceLost && d.inSpec == true { + closeCh = make(chan bool) + d.inSpec = false + go d.holdover(closeCh) + } else { + d.state = event.PTP_FREERUN + } + } + processCfg.EventChannel <- event.EventChannel{ + ProcessName: event.DPLL, + State: dpll_state, + IFace: d.iface, + CfgName: processCfg.ConfigName, + Values: map[event.ValueType]int64{ + event.FREQUENCY_STATUS: frequency_status, + event.OFFSET: phase_offset, + event.PHASE_STATUS: phase_status, + }, + ClockType: processCfg.ClockType, + Time: time.Now().Unix(), + WriteToLog: true, + Reset: false, + } + case <-processCfg.CloseCh: + processCfg.EventChannel <- event.EventChannel{ + ProcessName: event.DPLL, + IFace: d.iface, + CfgName: processCfg.ConfigName, + ClockType: processCfg.ClockType, + Time: time.Now().Unix(), + Reset: true, + } + ticker.Stop() + } + } +} + +func (d *DpllConfig) getState(pstate, fstate int64) int64 { + //TODO: Fix the logic to get correct lowest state + return int64(math.Min(float64(pstate), float64(fstate))) +} + +func (d *DpllConfig) holdover(closeCh chan bool) { + start := time.Now() + ticker := time.NewTicker(1 * time.Second) + d.state = event.PTP_HOLDOVER + for timeout := time.After(time.Duration(d.timer)); ; { + select { + case <-ticker.C: + //calculate offset + d.offset = int64(d.slope * float64(time.Since(start))) + case <-timeout: + d.state = event.PTP_FREERUN + return + case <-closeCh: + return + } + } +} + +func (d *DpllConfig) isOffsetInRange() bool { + if d.offset < d.processConfig.GMThreshold.Max && d.offset > d.processConfig.GMThreshold.Min { + return true + } + return false +} + +// Index of DPLL being configured [0:EEC (DPLL0), 1:PPS (DPLL1)] +// Frequency State (EEC_DPLL) +// cat /sys/class/net/interface_name/device/dpll_0_state +// Phase State +// cat /sys/class/net/ens7f0/device/dpll_1_state +// Phase Offset +// cat /sys/class/net/ens7f0/device/dpll_1_offset +func (d *DpllConfig) sysfs(iface string) (phaseState, frequencyState, phaseOffset int64) { + if iface == "" { + phaseState = DPLL_INVALID + frequencyState = DPLL_INVALID + phaseOffset = 0 + return + } + frequencyStateStr := fmt.Sprintf("/sys/class/net/%s/device/dpll_0_state", iface) + phaseStateStr := fmt.Sprintf("/sys/class/net/%s/device/dpll_1_state", iface) + phaseOffsetStr := fmt.Sprintf("/sys/class/net/%s/device/dpll_1_offset", iface) + // Read the content of the sysfs path + fContent, err := os.ReadFile(frequencyStateStr) + if err != nil { + glog.Infof("error reading sysfs path %s %s:", frequencyStateStr, err) + } else { + frequencyState = int64(binary.BigEndian.Uint64(fContent)) + } + // Read the content of the sysfs path + pContent, err2 := os.ReadFile(phaseStateStr) + if err2 != nil { + glog.Infof("error reading sysfs path %s %s:", phaseStateStr, err2) + } else { + phaseState = int64(binary.BigEndian.Uint64(pContent)) + } + // Read the content of the sysfs path + offsetContent, err3 := os.ReadFile(phaseOffsetStr) + if err3 != nil { + glog.Infof("error reading sysfs path %s %s:", phaseOffsetStr, err3) + } else { + phaseOffset = int64(binary.BigEndian.Uint64(offsetContent)) + } + return +} diff --git a/pkg/daemon/dpll/dpll_test.go b/pkg/daemon/dpll/dpll_test.go new file mode 100644 index 000000000..8f375517a --- /dev/null +++ b/pkg/daemon/dpll/dpll_test.go @@ -0,0 +1,28 @@ +package dpll_test + +import ( + "github.com/openshift/linuxptp-daemon/pkg/config" + "github.com/openshift/linuxptp-daemon/pkg/daemon/dpll" + "github.com/openshift/linuxptp-daemon/pkg/event" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestDpllConfig_MonitorProcess(t *testing.T) { + d := dpll.NewDpll(1400, 5, 10, []event.EventSource{}) + eventChannel := make(chan event.EventChannel, 10) + closeCh := make(chan bool) + + d.MonitorProcess(config.ProcessConfig{ + ClockType: "GM", + ConfigName: "test", + CloseCh: closeCh, + EventChannel: eventChannel, + GMThreshold: config.Threshold{}, + InitialPTPState: event.PTP_FREERUN, + }) + + ptpState := <-eventChannel + assert.Equal(t, ptpState.ProcessName, event.DPLL) + close(closeCh) +} diff --git a/pkg/daemon/ublox/types.go b/pkg/daemon/ublox/types.go index 46588e352..bd6f6c142 100644 --- a/pkg/daemon/ublox/types.go +++ b/pkg/daemon/ublox/types.go @@ -8,22 +8,33 @@ package ublox // version 0 nBlocks 2 reserved1 0 0 // blockId 0 flags x0 antStatus 2 antPower 1 postStatus 0 reserved2 0 0 0 0 // blockId 1 flags x0 antStatus 2 antPower 1 postStatus 0 reserved2 0 0 0 0 + +// ANT_STATUS ... type ANT_STATUS int + +// POWER_STATUS ... type POWER_STATUS int const ( + // OFF ... OFF POWER_STATUS = iota + // ON ... ON ) const ( + // NOT_OK ... NOT_OK ANT_STATUS = iota - UNKNOW + // UNKNOWN ... + UNKNOWN + // OK ... OK ) func (p POWER_STATUS) String() string { return [...]string{"OFF", "ON"}[p] } + +// IntString ... func (p POWER_STATUS) IntString() string { return [...]string{"0", "1"}[p] } @@ -31,35 +42,48 @@ func (p POWER_STATUS) IntString() string { func (a ANT_STATUS) String() string { return [...]string{"NOT_OK", "UNKNOWN", "OK"}[a] } + +// IntString ... func (a ANT_STATUS) IntString() string { return [...]string{"0", "1", "2"}[a] } -//Passed: Status of the Antenna is OK (i.e., antStatus equals 2) and Antena power status is Ok (i.e., antPower equals 1) -//Failure: antStatus not equals 2 and antPower not equals 1 - +// GNSSAntStatus ... +// Passed: Status of the Antenna is OK (i.e., antStatus equals 2) and Antena power status is Ok (i.e., antPower equals 1) +// Failure: antStatus not equals 2 and antPower not equals 1 +// GNSSAntStatus ... type GNSSAntStatus struct { blockID int32 antStatus ANT_STATUS powerStatus POWER_STATUS } +// AntennaOk ... func (g *GNSSAntStatus) AntennaOk() bool { return g.antStatus == OK && g.powerStatus == ON } +// Status ... func (g *GNSSAntStatus) Status() ANT_STATUS { return g.antStatus } + +// Power ... func (g *GNSSAntStatus) Power() POWER_STATUS { return g.powerStatus } + +// SetAntStatus ... func (g *GNSSAntStatus) SetAntStatus(antStatus ANT_STATUS) { g.antStatus = antStatus } + +// SetAntPower .. func (g *GNSSAntStatus) SetAntPower(power POWER_STATUS) { g.powerStatus = power } + +// NewAntStatus ... get antenna status func NewAntStatus(ant ANT_STATUS, power POWER_STATUS) GNSSAntStatus { return GNSSAntStatus{ blockID: 0, diff --git a/pkg/daemon/ublox/ublox.go b/pkg/daemon/ublox/ublox.go index 7fef4776b..90a62876a 100644 --- a/pkg/daemon/ublox/ublox.go +++ b/pkg/daemon/ublox/ublox.go @@ -17,24 +17,32 @@ import ( ) var ( + // PorotoVersionRegEx ... PorotoVersionRegEx = regexp.MustCompile(`PROTVER=+(\d+)`) + // AntennaStatusRegEx ... AntennaStatusRegEx = regexp.MustCompile(`antStatus[[:space:]]+(\d+)[[:space:]]antPower[[:space:]]+(\d+)`) - NavStatusRegEx = regexp.MustCompile(`gpsFix[[:space:]]+(\d+)`) - cmdTimeout = 10 * time.Second - ubloxProtoVersion = "29.20" + // NavStatusRegEx ... + NavStatusRegEx = regexp.MustCompile(`gpsFix[[:space:]]+(\d+)`) + cmdTimeout = 10 * time.Second + ubloxProtoVersion = "29.20" ) const ( - CMD_PROTO_VERSION = " -p MON-VER" + // CMD_PROTO_VERSION ... + CMD_PROTO_VERSION = " -p MON-VER" + // CMD_VOLTAGE_CONTROLER ... CMD_VOLTAGE_CONTROLER = " -v 1 -z CFG-HW-ANT_CFG_VOLTCTRL %d" - CMD_NAV_STATUS = " -t -p NAV-STATUS" + // CMD_NAV_STATUS ... + CMD_NAV_STATUS = " -t -p NAV-STATUS" ) +// UBlox ... UBlox type type UBlox struct { protoVersion *string mockExp func(cmdStr string) ([]string, error) } +// NewUblox ... create new Ublox func NewUblox() (*UBlox, error) { u := &UBlox{ protoVersion: &ubloxProtoVersion, @@ -142,6 +150,7 @@ func (u *UBlox) Query(command string, promptRE *regexp.Regexp) (result string, m // layers (ram bbr flash) transaction (Transactionless) // item CFG-HW-ANT_CFG_VOLTCTRL/0x10a3002e val 1 +// EnableDisableVoltageController ... // UBX-ACK-ACK: // ACK to Class x06 (CFG) ID x8a (VALSET) // TODO: Should read ACK-ACK to confirm right and read the item @@ -155,7 +164,8 @@ func (u *UBlox) EnableDisableVoltageController(command string, value int) ([]byt return stdout, err } -/* +// NavStatus ... +/* NavStatus ... ubxtool -t -w 3 -p NAV-STATUS -P 29.20 1683631651.3422 UBX-NAV-STATUS: @@ -205,3 +215,34 @@ func match(stdout string, ubLoxRegex *regexp.Regexp) (string, error) { } return "", fmt.Errorf("error parsing %s", stdout) } + +// GetNavOffset ... get gnss offset +func (u *UBlox) GetNavOffset() (string, error) { + command := "ubxtool" + args := []string{"-t", "-p", "NAV-CLOCK", "-P", "29.20"} + + output, err := exec.Command(command, args...).Output() + if err != nil { + return "", fmt.Errorf("error executing ubxtool command: %s", err) + + } + + offset := extractOffset(string(output)) + return offset, nil +} + +func extractOffset(output string) string { + // Find the line that contains "tAcc" + lines := strings.Split(output, "\n") + for _, line := range lines { + if strings.Contains(line, "tAcc") { + // Extract the offset value + fields := strings.Fields(line) + if len(fields) >= 6 { + return fields[6] + } + } + } + + return "" +} diff --git a/pkg/event/event.go b/pkg/event/event.go index a7d8f0f3d..a7bf4b75c 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -4,6 +4,9 @@ import ( "net" "strconv" "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" "github.com/openshift/linuxptp-daemon/pkg/pmc" @@ -13,74 +16,149 @@ import ( "github.com/golang/glog" ) +type ValueType string + +const ( + PTPNamespace = "openshift" + PTPSubsystem = "ptp" +) +const ( + OFFSET ValueType = "offset" + STATE ValueType = "state" + GPS_STATUS ValueType = "gnss_status" + //Status ValueType = "status" + PHASE_STATUS ValueType = "phase_status" + FREQUENCY_STATUS ValueType = "frequency_status" +) + +// ClockType ... type ClockType string const ( - GM = "GM" - BC = "BC" - OC = "OC" - PTP4lProcessName = "ptp4l" - TS2PHCProcessName = "ts2phc" + // GM .. + GM ClockType = "GM" + // BC ... + BC ClockType = "BC" + // OC ... + OC ClockType = "OC" +) + +// PTP4lProcessName ... +const PTP4lProcessName = "ptp4l" + +// TS2PHCProcessName ... +const TS2PHCProcessName = "ts2phc" + +// EventSource ... +type EventSource string + +const ( + GNSS EventSource = "gnss" + DPLL EventSource = "dpll" + TS2PHC EventSource = "ts2phc" + PTP4l EventSource = "ptp4l" + PHC2SYS EventSource = "phc2sys" + SYNCE EventSource = "syncE" ) -// EventType ... -type EventType string +// PTPState ... +type PTPState string const ( - // CLOCK_CLASS_CHANGE ... - CLOCK_CLASS_CHANGE = "CLOCK_CLASS_CHANGE" + // PTP_FREERUN ... - PTP_FREERUN = "PTP_FREERUN" + PTP_FREERUN PTPState = "s0" // PTP_HOLDOVER ... - PTP_HOLDOVER = "PTP_HOLDOVER" + PTP_HOLDOVER PTPState = "s1" // PTP_LOCKED ... - PTP_LOCKED = "PTP_LOCKED" - // GNSS_STATUS ... - GNSS_STATUS = "GNSS_STATUS" - // GNSS FREERUN - GNSS_FREERUN = "GNSS_FREERUN" - // GNSS LOCKED - GNSS_LOCKED = "GNSS_LOCKED" - connectionRetryInterval = 1 * time.Second + PTP_LOCKED PTPState = "s2" + // PTP_UNKNOWN + PTP_UNKNOWN PTPState = "-1" ) +// CLOCK_CLASS_CHANGE ... +const CLOCK_CLASS_CHANGE = "CLOCK_CLASS_CHANGE" + +const connectionRetryInterval = 1 * time.Second + // EventHandler ... event handler to process events type EventHandler struct { - stdOutSocket string - stdoutToSocket bool - processChannel <-chan EventChannel - closeCh chan bool + sync.Mutex + nodeName string + stdoutSocket string + stdoutToSocket bool + processChannel <-chan EventChannel + closeCh chan bool + data map[string][]Data + statusRequestChannel chan StatusRequest + offsetMetric *prometheus.GaugeVec + clockMetric *prometheus.GaugeVec +} +type StatusRequest struct { + Source EventSource + CfgName string + ResponseChannel chan<- PTPState } +var ( + statusRequestChannel chan StatusRequest +) + // EventChannel .. event channel to subscriber to events type EventChannel struct { - ProcessName string - Type EventType - CfgName string - Value int64 - ClockType ClockType - Update bool + ProcessName EventSource // ptp4l, gnss etc + State PTPState // PTP locked etc + IFace string // Interface that is causing the event + CfgName string // ptp config profile name + Values map[ValueType]int64 // either offset or status , 3 information offset , phase state and frequency state + logString string // if logstring sent here then log is not printed by the process and it is managed here + ClockType ClockType // oc bc gm + Time int64 // time.Unix.Now() + WriteToLog bool // send to log in predefined format %s[%d]:[%s] %s %d + Reset bool // reset data on ptp deletes or process died } var ( mockTest bool = false ) +// MockEnable ... func (e *EventHandler) MockEnable() { mockTest = true } // Init ... initialize event manager -func Init(stdOutToSocket bool, socketName string, processChannel chan EventChannel, closeCh chan bool) *EventHandler { +func Init(nodeName string, stdOutToSocket bool, socketName string, processChannel chan EventChannel, closeCh chan bool, offsetMetric *prometheus.GaugeVec, clockMetric *prometheus.GaugeVec) *EventHandler { + statusRequestChannel = make(chan StatusRequest) ptpEvent := &EventHandler{ - stdOutSocket: socketName, - stdoutToSocket: stdOutToSocket, - closeCh: closeCh, - processChannel: processChannel, + nodeName: nodeName, + stdoutSocket: socketName, + stdoutToSocket: stdOutToSocket, + closeCh: closeCh, + processChannel: processChannel, + data: map[string][]Data{}, + statusRequestChannel: statusRequestChannel, + clockMetric: clockMetric, + offsetMetric: offsetMetric, } return ptpEvent } +func (e *EventHandler) getGMState(cfgName string) PTPState { + lowestState := "" + if data, ok := e.data[cfgName]; ok { + for i, d := range data { + if i == 0 || string(d.State) < lowestState { + lowestState = string(d.State) + } + } + } + //gated + if lowestState == "" { + lowestState = "-1" + } + return PTPState(lowestState) +} // ProcessEvents ... process events to generate new events func (e *EventHandler) ProcessEvents() { @@ -100,7 +178,7 @@ func (e *EventHandler) ProcessEvents() { return default: if e.stdoutToSocket { - c, err = net.Dial("unix", e.stdOutSocket) + c, err = net.Dial("unix", e.stdoutSocket) if err != nil { glog.Errorf("event process error trying to connect to event socket %s", err) time.Sleep(connectionRetryInterval) @@ -110,37 +188,101 @@ func (e *EventHandler) ProcessEvents() { } glog.Info("Starting event monitoring...") writeEventToLog := false - var logOut string + + // listen To any requests + go e.listenToStateRequest() + for { select { case event := <-e.processChannel: - if event.Type == PTP_FREERUN || - event.Type == PTP_HOLDOVER || - event.Type == PTP_LOCKED { - // call pmc command and change - glog.Infof("EVENT: received ptp clock status change request %s", event.Type) - e.updateCLockClass(event.CfgName, event.Type, event.ClockType) - writeEventToLog = false - } else if event.Type == GNSS_STATUS && event.Update == true { - glog.Infof("EVENT: received GNSS status change request %s", event.Type) - if event.Value < 3 { - e.updateCLockClass(event.CfgName, GNSS_FREERUN, event.ClockType) + // ts2phc[123455]:[ts2phc.0.config] 12345 s0 offset/gps + // replace ts2phc logs here + var logOut []string + for k, v := range event.Values { + logOut = append(logOut, fmt.Sprintf("%s[%d]:[%s] %s %d %s\n", event.ProcessName, + time.Now().Unix(), event.CfgName, k, v, event.State)) + } + + if event.Reset { // clean up + if event.ProcessName == TS2PHC { + e.unregisterMetrics(event.CfgName, "") + delete(e.data, event.CfgName) } else { - e.updateCLockClass(event.CfgName, GNSS_LOCKED, event.ClockType) + // Check if the index is within the slice bounds + for indexToRemove, d := range e.data[event.CfgName] { + if d.ProcessName == event.ProcessName { + e.unregisterMetrics(event.CfgName, string(event.ProcessName)) + if indexToRemove < len(e.data[event.CfgName]) { + e.data[event.CfgName] = append(e.data[event.CfgName][:indexToRemove], e.data[event.CfgName][indexToRemove+1:]...) + } + } + } } - writeEventToLog = true + continue } - logOut = fmt.Sprintf("%s[%d]:[%s] %s %d\n", event.ProcessName, - time.Now().Unix(), event.CfgName, event.Type, event.Value) + + // Update the in MemData + if _, ok := e.data[event.CfgName]; !ok { + e.data[event.CfgName] = []Data{{ + ProcessName: event.ProcessName, + State: event.State, + ClockType: event.ClockType, + IFace: event.IFace, + Metrics: map[ValueType]DataMetrics{}, + }} + } else { + found := false + for i, d := range e.data[event.CfgName] { + if d.ProcessName == event.ProcessName { + e.data[event.CfgName][i].State = event.State + found = true + } + } + if !found { + e.data[event.CfgName] = append(e.data[event.CfgName], Data{ + ProcessName: event.ProcessName, + State: event.State, + ClockType: event.ClockType, + Metrics: map[ValueType]DataMetrics{}, + }) + } + } + + /// get Current GM state computing from DPLL, GNSS & ts2phc state + gmState := e.getGMState(event.CfgName) + if !e.stdoutToSocket { // if event s not enabled + e.updateMetrics(event.CfgName, event.ProcessName, event.Values) + e.UpdateClockStateMetrics(event.State, string(event.ProcessName), event.IFace) + e.UpdateClockStateMetrics(gmState, "GM", event.IFace) + } + writeEventToLog = true + logOut = append(logOut, fmt.Sprintf("%s[%d]:[%s] T-GM-STATUS %s\n", GM, time.Now().Unix(), event.CfgName, e.getGMState(event.CfgName))) + switch gmState { + case PTP_FREERUN: + clockClass := 248 + logOut = append(logOut, fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", PTP4l, time.Now().Unix(), event.CfgName, clockClass)) + case PTP_LOCKED: + clockClass := 6 + logOut = append(logOut, fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", PTP4l, time.Now().Unix(), event.CfgName, clockClass)) + case PTP_HOLDOVER: + clockClass := 7 + logOut = append(logOut, fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", PTP4l, time.Now().Unix(), event.CfgName, clockClass)) + default: + } + if writeEventToLog { if e.stdoutToSocket { - _, err := c.Write([]byte(logOut)) - if err != nil { - glog.Errorf("Write error %s:", err) - goto connect + for _, l := range logOut { + _, err := c.Write([]byte(l)) + if err != nil { + glog.Errorf("Write error %s:", err) + goto connect + } } } else { - fmt.Printf("%s", logOut) + for _, l := range logOut { + fmt.Printf("%s", l) + } } } case <-e.closeCh: @@ -151,16 +293,16 @@ func (e *EventHandler) ProcessEvents() { }() } -func (e *EventHandler) updateCLockClass(cfgName string, eventType EventType, clockType ClockType) { - switch eventType { - case PTP_LOCKED, GNSS_LOCKED: +func (e *EventHandler) updateCLockClass(cfgName string, ptpState PTPState, clockType ClockType) { + switch ptpState { + case PTP_LOCKED: switch clockType { case GM: changeClockType(cfgName, pmc.CmdUpdateGMClass_LOCKED, 6) case OC: case BC: } - case PTP_FREERUN, GNSS_FREERUN: + case PTP_FREERUN: switch clockType { case GM: changeClockType(cfgName, pmc.CmdUpdateGMClass_FREERUN, 7) @@ -175,9 +317,7 @@ func (e *EventHandler) updateCLockClass(cfgName string, eventType EventType, clo case BC: } default: - } - } func changeClockType(cfgName, cmd string, value int64) { @@ -216,3 +356,145 @@ func getCurrentClockClass(cfgName string) int64 { } return 0 } + +// GetPTPState ... +func (e *EventHandler) GetPTPState(source EventSource, cfgName string) PTPState { + if m, ok := e.data[cfgName]; ok { + for _, v := range m { + if v.ProcessName == source { + return v.State + } + } + } + return PTP_UNKNOWN +} + +func (e *EventHandler) listenToStateRequest() { + for { + select { + case request := <-e.statusRequestChannel: + if m, ok := e.data[request.CfgName]; ok { + for _, v := range m { + if v.ProcessName == request.Source { + request.ResponseChannel <- v.State + } + } + } + request.ResponseChannel <- PTP_UNKNOWN + } + } +} + +// UpdateClockStateMetrics ... +func (e *EventHandler) UpdateClockStateMetrics(state PTPState, process, iFace string) { + if state == PTP_LOCKED { + e.clockMetric.With(prometheus.Labels{ + "process": process, "node": e.nodeName, "iface": iFace}).Set(1) + } else if state == PTP_FREERUN { + e.clockMetric.With(prometheus.Labels{ + "process": process, "node": e.nodeName, "iface": iFace}).Set(0) + } else if state == PTP_HOLDOVER { + e.clockMetric.With(prometheus.Labels{ + "process": process, "node": e.nodeName, "iface": iFace}).Set(2) + } else { + e.clockMetric.With(prometheus.Labels{ + "process": process, "node": e.nodeName, "iface": iFace}).Set(3) + } +} +func (e *EventHandler) updateMetrics(cfgName string, process EventSource, processData map[ValueType]int64) { + if dataArray, ok := e.data[cfgName]; ok { // create metric for the data that was captured + for _, d := range dataArray { + if d.ProcessName == process { // is gnss or dpll or ts2phc + for dataType, value := range processData { // update process with metrics + if _, found := d.Metrics[dataType]; !found { //todo: allow duplicate text + if dataType == OFFSET { + if d.Metrics[dataType].GaugeMetric == nil { + m := d.Metrics[dataType] + m.GaugeMetric = e.offsetMetric + d.Metrics[dataType] = m + } + pLabels := map[string]string{"from": string(d.ProcessName), "node": e.nodeName, + "process": string(d.ProcessName), "iface": d.IFace} + d.Metrics[dataType].GaugeMetric.With(pLabels).Set(float64(value)) + continue + } else { + metrics := DataMetrics{ + isRegistered: true, + GaugeMetric: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: PTPNamespace, + Subsystem: PTPSubsystem, + Name: getMetricName(dataType), + Help: "", + }, []string{"from", "node", "process", "iface"}), + CounterMetric: nil, + Name: string(dataType), + ValueType: prometheus.GaugeValue, + Labels: map[string]string{"from": string(d.ProcessName), "node": e.nodeName, + "process": string(d.ProcessName), "iface": ""}, + Value: float64(value), + } + registerMetrics(metrics.GaugeMetric) + metrics.GaugeMetric.With(metrics.Labels).Set(float64(value)) + d.Metrics[dataType] = metrics + } + } else { + s := d.Metrics[dataType] + s.Labels = map[string]string{"from": string(d.ProcessName), "node": e.nodeName, + "process": string(d.ProcessName), "iface": d.IFace} + s.Value = float64(value) + d.Metrics[dataType].GaugeMetric.With(s.Labels).Set(float64(value)) + } + } + } + } + } +} + +func registerMetrics(m *prometheus.GaugeVec) { + defer func() { + if err := recover(); err != nil { + glog.Errorf("restored from registering metrics: %s", err) + } + }() + prometheus.MustRegister(m) +} + +func (e *EventHandler) unregisterMetrics(configName string, processName string) { + if m, ok := e.data[configName]; ok { + for _, v := range m { + if string(v.ProcessName) == processName || processName == "" { + for _, m := range v.Metrics { + if m.GaugeMetric != nil { + m.GaugeMetric.Delete(m.Labels) + } + } + } + } + } +} + +// GetPTPStateRequest ... +// GetPTPStateRequestChannel if Plugin requires to know the status of other compoenent they could use this channel +// Send a status request +// +// responseChannel := make(chan string) +// statusRequestChannel <- StatusRequest{ResponseChannel: responseChannel} +// +// Wait for and receive the response +// response := <-responseChannel +func GetPTPStateRequest(request StatusRequest) { + // Send a status request + //responseChannel := make(chan string) + statusRequestChannel <- request + + // Wait for and receive the response + //response := <-responseChannel + +} +func getMetricName(valueType ValueType) string { + if strings.HasSuffix(string(valueType), string(OFFSET)) { + return fmt.Sprintf("%s_%s", valueType, "ns") + } + return string(valueType) +} diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index c5a3b571f..8da086088 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -21,7 +21,7 @@ func TestEventHandler_ProcessEvents(t *testing.T) { closeChn := make(chan bool) go listenToEvents(closeChn) time.Sleep(2 * time.Second) - eventMananger := event.Init(true, "/tmp/go.sock", eChannel, closeChn) + eventMananger := event.Init("node", true, "/tmp/go.sock", eChannel, closeChn) eventMananger.MockEnable() go eventMananger.ProcessEvents() time.Sleep(2 * time.Second) diff --git a/pkg/event/stats.go b/pkg/event/stats.go new file mode 100644 index 000000000..291757b60 --- /dev/null +++ b/pkg/event/stats.go @@ -0,0 +1,23 @@ +package event + +import "github.com/prometheus/client_golang/prometheus" + +// Data ... +type Data struct { + ProcessName EventSource + IFace string + State PTPState + ClockType ClockType + Metrics map[ValueType]DataMetrics +} + +// DataMetrics ... +type DataMetrics struct { + isRegistered bool + GaugeMetric *prometheus.GaugeVec + CounterMetric *prometheus.Counter + Name string + ValueType prometheus.ValueType + Labels prometheus.Labels + Value float64 +}