Skip to content

Commit

Permalink
Add monitoring for all process and setting up GM State
Browse files Browse the repository at this point in the history
Signed-off-by: Aneesh Puttur <[email protected]>
  • Loading branch information
aneeshkp authored and josephdrichard committed Jun 7, 2024
1 parent cdda3ed commit 5273de1
Show file tree
Hide file tree
Showing 14 changed files with 979 additions and 134 deletions.
14 changes: 14 additions & 0 deletions pkg/daemon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,17 @@ func GetKubeConfig() (*rest.Config, error) {

return nil, fmt.Errorf("Could not locate a kubeconfig")
}

type ProcessConfig struct {
ClockType event.ClockType
ConfigName string
CloseCh chan bool
EventChannel chan<- event.EventChannel
GMThreshold Threshold
InitialPTPState event.PTPState
}
type Threshold struct {
Max int64
Min int64
HoldOverTimeout int64
}
2 changes: 1 addition & 1 deletion pkg/daemon/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (conf *ptp4lConf) renderPtp4lConf() (string, string) {

for _, section := range conf.sections {
configOut = fmt.Sprintf("%s\n%s", configOut, section.sectionName)
if section.sectionName != "[global]" {
if section.sectionName != "[global]" && section.sectionName != "[nmea]" {
iface := section.sectionName
iface = strings.ReplaceAll(iface, "[", "")
iface = strings.ReplaceAll(iface, "]", "")
Expand Down
127 changes: 97 additions & 30 deletions pkg/daemon/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type ptpProcess struct {
depProcess []process // this could gpsd and other process which needs to be stopped if the parent process is stopped
nodeProfile *ptpv1.PtpProfile
parentClockClass float64
pmcCheck bool
pmcCheck bool
ptpClockThreshold *ptpv1.PtpClockThreshold
}

func (p *ptpProcess) Stopped() bool {
Expand Down Expand Up @@ -136,7 +137,7 @@ func (dn *Daemon) Run() {
if p != nil {
for _, d := range p.depProcess {
if d != nil {
d.cmdStop()
d.CmdStop()
d = nil
}
}
Expand Down Expand Up @@ -174,7 +175,7 @@ func (dn *Daemon) applyNodePTPProfiles() error {
if p.depProcess != nil {
for _, d := range p.depProcess {
if d != nil {
d.cmdStop()
d.CmdStop()
d = nil
}
}
Expand Down Expand Up @@ -216,15 +217,28 @@ func (dn *Daemon) applyNodePTPProfiles() error {
for _, d := range p.depProcess {
if d != nil {
time.Sleep(3 * time.Second)
go d.cmdRun(false)
go d.CmdRun(false)
time.Sleep(3 * time.Second)
dn.pluginManager.AfterRunPTPCommand(p.nodeProfile, d.Name())
d.monitorEvent(clockType, p.configName, p.exitCh, dn.processManager.eventChannel)
//TODO: Maybe Move DPLL start and stop as part of pluign
d.MonitorProcess(config.ProcessConfig{
ClockType: clockType,
ConfigName: p.configName,
CloseCh: p.exitCh,
EventChannel: dn.processManager.eventChannel,
GMThreshold: config.Threshold{
Max: p.ptpClockThreshold.MaxOffsetThreshold,
Min: p.ptpClockThreshold.MinOffsetThreshold,
HoldOverTimeout: p.ptpClockThreshold.HoldOverTimeout,
},
InitialPTPState: event.PTP_FREERUN,
})
}
}
}
time.Sleep(1 * time.Second)
go p.cmdRun()
p.eventCh = dn.processManager.eventChannel
dn.pluginManager.AfterRunPTPCommand(p.nodeProfile, p.name)
}
}
Expand Down Expand Up @@ -366,33 +380,40 @@ func (dn *Daemon) applyNodePtpProfile(runID int, nodeProfile *ptpv1.PtpProfile)
ifacesList := strings.Split(ifaces, ",")

dprocess := ptpProcess{
name: p,
ifaces: ifacesList,
ptp4lConfigPath: configPath,
ptp4lSocketPath: socketPath,
configName: configFile,
messageTag: messageTag,
exitCh: make(chan bool),
stopped: false,
logFilterRegex: getLogFilterRegex(nodeProfile),
cmd: cmd,
depProcess: []process{},
nodeProfile: nodeProfile,
name: p,
ifaces: ifacesList,
ptp4lConfigPath: configPath,
ptp4lSocketPath: socketPath,
configName: configFile,
messageTag: messageTag,
exitCh: make(chan bool),
stopped: false,
logFilterRegex: getLogFilterRegex(nodeProfile),
cmd: cmd,
depProcess: []process{},
nodeProfile: nodeProfile,
ptpClockThreshold: getPTPThreshold(nodeProfile),
}
//TODO HARDWARE PLUGIN for e810
if pProcess == ts2phcProcessName { //& if the x plugin is enabled
//TODO: move this to plugin or call it from hwplugin or leave it here and remove Hardcoded
gmInterface := ""
if len(ifacesList) > 0 {
gmInterface = ifacesList[0]
}
if e := mkFifo(); e != nil {
glog.Errorf("Error creating named pipe, GNSS monitoring will not work as expected %s", e.Error())
}
gpsDaemon := &gpsd{
name: GPSD_PROCESSNAME,
execMutex: sync.Mutex{},
cmd: nil,
serialPort: GPSD_SERIALPORT,
exitCh: make(chan bool),
stopped: false,
name: GPSD_PROCESSNAME,
execMutex: sync.Mutex{},
cmd: nil,
serialPort: GPSD_SERIALPORT,
exitCh: make(chan bool),
gmInterface: gmInterface,
stopped: false,
}
gpsDaemon.cmdInit()
gpsDaemon.CmdInit()
dprocess.depProcess = append(dprocess.depProcess, gpsDaemon)

// init gpspipe
Expand All @@ -404,15 +425,20 @@ func (dn *Daemon) applyNodePtpProfile(runID int, nodeProfile *ptpv1.PtpProfile)
exitCh: make(chan bool),
stopped: false,
}
gpsPipeDaemon.cmdInit()
gpsPipeDaemon.CmdInit()
dprocess.depProcess = append(dprocess.depProcess, gpsPipeDaemon)
// init dpll
// TODO: Try to inject DPLL depProcess via plugin ?
dpllDaemon := dpll.NewDpll(dpll.LocalMaxHoldoverOffSet, dpll.LocalHoldoverTimeout, dpll.MaxInSpecOffset,
gmInterface, []event.EventSource{event.GNSS})
dprocess.depProcess = append(dprocess.depProcess, dpllDaemon)
dn.processManager.process = append(dn.processManager.process, &dprocess)
}
err = os.WriteFile(configPath, []byte(configOutput), 0644)
if err != nil {
printNodeProfile(nodeProfile)
return fmt.Errorf("failed to write the configuration file named %s: %v", configPath, err)
}
dn.processManager.process = append(dn.processManager.process, &dprocess)
}

return nil
Expand Down Expand Up @@ -506,7 +532,7 @@ func (p *ptpProcess) cmdRun() {
p.cmd.Stderr = os.Stderr
cmdReader, err := p.cmd.StdoutPipe()
if err != nil {
glog.Errorf("cmdRun() error creating StdoutPipe for %s: %v", p.name, err)
glog.Errorf("CmdRun() error creating StdoutPipe for %s: %v", p.name, err)
break
}
scanner := bufio.NewScanner(cmdReader)
Expand All @@ -522,21 +548,42 @@ func (p *ptpProcess) cmdRun() {
if regexErr != nil || !logFilterRegex.MatchString(output) {
fmt.Printf("%s\n", output)
}
extractMetrics(p.messageTag, p.name, p.ifaces, output)
source, ts2phcsOffset, state, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output)
var ts2phcState event.PTPState
if state == FREERUN {
ts2phcState = event.PTP_FREERUN
} else if state == LOCKED {
ts2phcState = event.PTP_LOCKED
}
if source == ts2phcProcessName {
p.eventCh <- event.EventChannel{
ProcessName: event.TS2PHC,
State: ts2phcState,
CfgName: p.configName,
IFace: iface,
Values: map[event.ValueType]int64{
event.OFFSET: int64(ts2phcsOffset),
},
ClockType: "GM", //TODO: add actual defined
Time: time.Now().Unix(),
WriteToLog: true,
Reset: false,
}
}
}
done <- struct{}{}
}()
// Don't restart after termination
if !p.Stopped() {
err = p.cmd.Start() // this is asynchronous call,
if err != nil {
glog.Errorf("cmdRun() error starting %s: %v", p.name, err)
glog.Errorf("CmdRun() error starting %s: %v", p.name, err)
}
}
<-done // goroutine is done
err = p.cmd.Wait()
if err != nil {
glog.Errorf("cmdRun() error waiting for %s: %v", p.name, err)
glog.Errorf("CmdRun() error waiting for %s: %v", p.name, err)
}
processStatus(p.name, p.messageTag, PtpProcessDown)

Expand Down Expand Up @@ -577,3 +624,23 @@ func (p *ptpProcess) cmdStop() {
<-p.exitCh
glog.Infof("Process %d terminated", p.cmd.Process.Pid)
}

func getPTPThreshold(nodeProfile *ptpv1.PtpProfile) *ptpv1.PtpClockThreshold {
if nodeProfile.PtpClockThreshold != nil {
return &ptpv1.PtpClockThreshold{
HoldOverTimeout: nodeProfile.PtpClockThreshold.HoldOverTimeout,
MaxOffsetThreshold: nodeProfile.PtpClockThreshold.MaxOffsetThreshold,
MinOffsetThreshold: nodeProfile.PtpClockThreshold.MinOffsetThreshold,
}
} else {
return &ptpv1.PtpClockThreshold{
HoldOverTimeout: 5,
MaxOffsetThreshold: 100,
MinOffsetThreshold: 100,
}
}
}

func (p *ptpProcess) MonitorEvent(offset float64, clockState string) {

}
Loading

0 comments on commit 5273de1

Please sign in to comment.