Skip to content

EH: qstat -g t parsing. Test example. #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/testexample/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
testexample
13 changes: 13 additions & 0 deletions examples/testexample/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/hpc-gridware/go-clusterscheduler/examples/testexample

go 1.23.1

replace github.com/hpc-gridware/go-clusterscheduler => ../..

require (
github.com/hpc-gridware/go-clusterscheduler v0.0.0-20241027163340-55dac298d370
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.35.1
)

require go.uber.org/multierr v1.10.0 // indirect
38 changes: 38 additions & 0 deletions examples/testexample/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0=
github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA=
github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
256 changes: 256 additions & 0 deletions examples/testexample/testexample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package main

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

qacct "github.com/hpc-gridware/go-clusterscheduler/pkg/qacct/v9.0"
qstat "github.com/hpc-gridware/go-clusterscheduler/pkg/qstat/v9.0"
"google.golang.org/protobuf/types/known/timestamppb"

"go.uber.org/zap"
)

var qacctClient qacct.QAcct
var qstatClient qstat.QStat

var log *zap.Logger

func init() {
var err error
log, _ = zap.NewProduction()
qacctClient, err = qacct.NewCommandLineQAcct(qacct.CommandLineQAcctConfig{})
if err != nil {
log.Fatal("Failed to initialize qacct client", zap.String("error",
err.Error()))
}
qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{})
if err != nil {
log.Fatal("Failed to initialize qstat client", zap.String("error",
err.Error()))
}
}

func main() {
run(context.Background())
}

func run(ctx context.Context) {
defer log.Sync()
alreadySent := map[string]struct{}{}

for {
select {
case <-ctx.Done():
log.Info("Context cancelled, stopping ClusterScheduler")
return
default:
finishedJobs, err := GetFinishedJobs()
if err != nil {
log.Error("Error getting finished jobs", zap.String("error",
err.Error()))
}

runningJobs, err := GetRunningJobs()
if err != nil {
log.Error("Error getting running jobs", zap.String("error",
err.Error()))
}

allJobs := append(finishedJobs, runningJobs...)

var protoJobs []*SimpleJob
for _, job := range allJobs {
if _, ok := alreadySent[job.JobId]; ok {
continue
}
protoJobs = append(protoJobs, job)
}

_, err = SendJobs(ctx, protoJobs)
if err != nil {
log.Warn("Error ingesting jobs", zap.String("error",
err.Error()))
}

for _, job := range finishedJobs {
alreadySent[job.JobId] = struct{}{}
}

select {
case <-ctx.Done():
log.Info("Context cancelled, stopping")
return
case <-time.After(10 * time.Second):
}
}
}
}

type SimpleJob struct {
JobId string `json:"job_id"`
// Cluster represents the queue name
Cluster string `json:"cluster"`
JobName string `json:"job_name"`
// Partition represents the parallel environment
Partition string `json:"partition"`
Account string `json:"account"`
User string `json:"user"`
State string `json:"state"`
ExitCode string `json:"exit_code"`
Submit *timestamppb.Timestamp `json:"submit"`
Start *timestamppb.Timestamp `json:"start"`
End *timestamppb.Timestamp `json:"end"`
MasterNode string `json:"master_node"`
}

func GetFinishedJobs() ([]*SimpleJob, error) {
// Use qacct NativeSpecification to get finished jobs
qacctOutput, err := qacctClient.NativeSpecification([]string{"-j", "*"})
if err != nil {
return nil, fmt.Errorf("error running qacct command: %v", err)
}

jobs, err := qacct.ParseQAcctJobOutput(qacctOutput)
if err != nil {
return nil, fmt.Errorf("error parsing qacct output: %v", err)
}
// convert to SimpleJob format
simpleJobs := make([]*SimpleJob, len(jobs))
for i, job := range jobs {
state := fmt.Sprintf("%d", job.ExitStatus)
if state == "0" {
state = "done"
} else {
state = "failed"
}
simpleJobs[i] = &SimpleJob{
// ignore job arrays for now
JobId: fmt.Sprintf("%d", job.JobNumber),
Cluster: job.QName,
JobName: job.JobName,
Partition: job.GrantedPE,
Account: job.Account,
User: job.Owner,
State: state,
ExitCode: fmt.Sprintf("%d", job.ExitStatus),
Submit: parseTimestamp(job.QSubTime),
Start: parseTimestamp(job.StartTime),
End: parseTimestamp(job.EndTime),
MasterNode: job.HostName,
}
}
return simpleJobs, nil
}

func GetRunningJobs() ([]*SimpleJob, error) {

qstatOverview, err := qstatClient.NativeSpecification([]string{"-g", "t"})
if err != nil {
return nil, fmt.Errorf("error running qstat command: %v", err)
}
jobsByTask, err := qstat.ParseGroupByTask(qstatOverview)
if err != nil {
return nil, fmt.Errorf("error parsing qstat output: %v", err)
}

type State struct {
QueueName string
State string
MasterNode string
}

stateMap := map[int]State{}

for _, job := range jobsByTask {
// we are only interested in serial and parallel jobs (no arrays)
jq := strings.Split(job.Queue, "@")
if len(jq) == 2 {
js, exists := stateMap[job.JobID]
if !exists {
js = State{
QueueName: jq[0],
State: job.State,
MasterNode: jq[1],
}
}
if job.Master == "MASTER" {
// this is the master task of a parallel job
js.MasterNode = jq[1]
stateMap[job.JobID] = js
}
continue
}
stateMap[job.JobID] = State{
QueueName: job.Queue,
State: job.State,
}
}

// get running jobs
qstatOutput, err := qstatClient.NativeSpecification([]string{"-j", "*"})
if err != nil {
return nil, fmt.Errorf("error running qstat command: %v", err)
}

jobs, err := qstat.ParseSchedulerJobInfo(qstatOutput)
if err != nil {
return nil, fmt.Errorf("error parsing qstat output: %v", err)
}

// convert to SimpleJob format
simpleJobs := make([]*SimpleJob, len(jobs))
for i, job := range jobs {
state := stateMap[job.JobNumber].State
if state == "" {
state = "running"
}
simpleJobs[i] = &SimpleJob{
JobId: fmt.Sprintf("%d", job.JobNumber),
Cluster: stateMap[job.JobNumber].QueueName,
JobName: job.JobName,
Partition: strings.Split(job.ParallelEnvironment, " ")[0], // PE
Account: job.Account,
User: job.Owner,
State: state,
ExitCode: "",
MasterNode: stateMap[job.JobNumber].MasterNode,
}
if strings.Contains(stateMap[job.JobNumber].State, "q") {
simpleJobs[i].Submit = parseTimestamp(job.SubmissionTime)
} else {
simpleJobs[i].Start = parseTimestamp(job.SubmissionTime)
}
}
return simpleJobs, nil
}

func SendJobs(ctx context.Context, jobs []*SimpleJob) (int, error) {
log.Info("Sending jobs", zap.Int("jobs", len(jobs)))
// Print the jobs
for _, job := range jobs {
// pretty print JSON
json, err := json.MarshalIndent(job, "", " ")
if err != nil {
return 0, fmt.Errorf("error marshalling job: %v", err)
}
fmt.Println(string(json))
}
return len(jobs), nil
}

// 2024-10-24 09:49:59.911136
func parseTimestamp(s string) *timestamppb.Timestamp {
loc, err := time.LoadLocation("Local")
if err != nil {
return nil
}
t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", s, loc)
if err != nil {
return nil
}
return timestamppb.New(t)
}
Loading
Loading