Skip to content

Commit f20744a

Browse files
authored
EH: qstat -g t parsing. Test example. (#21)
1 parent b7466b6 commit f20744a

File tree

9 files changed

+488
-28
lines changed

9 files changed

+488
-28
lines changed

examples/testexample/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
testexample

examples/testexample/go.mod

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module github.com/hpc-gridware/go-clusterscheduler/examples/testexample
2+
3+
go 1.23.1
4+
5+
replace github.com/hpc-gridware/go-clusterscheduler => ../..
6+
7+
require (
8+
github.com/hpc-gridware/go-clusterscheduler v0.0.0-20241027163340-55dac298d370
9+
go.uber.org/zap v1.27.0
10+
google.golang.org/protobuf v1.35.1
11+
)
12+
13+
require go.uber.org/multierr v1.10.0 // indirect

examples/testexample/go.sum

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
4+
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
5+
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
6+
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
7+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
8+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
9+
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
10+
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
11+
github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0=
12+
github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA=
13+
github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
14+
github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY=
15+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
16+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
17+
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
18+
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
19+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
20+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
21+
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
22+
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
23+
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
24+
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
25+
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
26+
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
27+
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
28+
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
29+
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
30+
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
31+
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
32+
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
33+
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
34+
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
35+
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
36+
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
37+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
38+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

examples/testexample/testexample.go

+256
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"strings"
8+
"time"
9+
10+
qacct "github.com/hpc-gridware/go-clusterscheduler/pkg/qacct/v9.0"
11+
qstat "github.com/hpc-gridware/go-clusterscheduler/pkg/qstat/v9.0"
12+
"google.golang.org/protobuf/types/known/timestamppb"
13+
14+
"go.uber.org/zap"
15+
)
16+
17+
var qacctClient qacct.QAcct
18+
var qstatClient qstat.QStat
19+
20+
var log *zap.Logger
21+
22+
func init() {
23+
var err error
24+
log, _ = zap.NewProduction()
25+
qacctClient, err = qacct.NewCommandLineQAcct(qacct.CommandLineQAcctConfig{})
26+
if err != nil {
27+
log.Fatal("Failed to initialize qacct client", zap.String("error",
28+
err.Error()))
29+
}
30+
qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{})
31+
if err != nil {
32+
log.Fatal("Failed to initialize qstat client", zap.String("error",
33+
err.Error()))
34+
}
35+
}
36+
37+
func main() {
38+
run(context.Background())
39+
}
40+
41+
func run(ctx context.Context) {
42+
defer log.Sync()
43+
alreadySent := map[string]struct{}{}
44+
45+
for {
46+
select {
47+
case <-ctx.Done():
48+
log.Info("Context cancelled, stopping ClusterScheduler")
49+
return
50+
default:
51+
finishedJobs, err := GetFinishedJobs()
52+
if err != nil {
53+
log.Error("Error getting finished jobs", zap.String("error",
54+
err.Error()))
55+
}
56+
57+
runningJobs, err := GetRunningJobs()
58+
if err != nil {
59+
log.Error("Error getting running jobs", zap.String("error",
60+
err.Error()))
61+
}
62+
63+
allJobs := append(finishedJobs, runningJobs...)
64+
65+
var protoJobs []*SimpleJob
66+
for _, job := range allJobs {
67+
if _, ok := alreadySent[job.JobId]; ok {
68+
continue
69+
}
70+
protoJobs = append(protoJobs, job)
71+
}
72+
73+
_, err = SendJobs(ctx, protoJobs)
74+
if err != nil {
75+
log.Warn("Error ingesting jobs", zap.String("error",
76+
err.Error()))
77+
}
78+
79+
for _, job := range finishedJobs {
80+
alreadySent[job.JobId] = struct{}{}
81+
}
82+
83+
select {
84+
case <-ctx.Done():
85+
log.Info("Context cancelled, stopping")
86+
return
87+
case <-time.After(10 * time.Second):
88+
}
89+
}
90+
}
91+
}
92+
93+
type SimpleJob struct {
94+
JobId string `json:"job_id"`
95+
// Cluster represents the queue name
96+
Cluster string `json:"cluster"`
97+
JobName string `json:"job_name"`
98+
// Partition represents the parallel environment
99+
Partition string `json:"partition"`
100+
Account string `json:"account"`
101+
User string `json:"user"`
102+
State string `json:"state"`
103+
ExitCode string `json:"exit_code"`
104+
Submit *timestamppb.Timestamp `json:"submit"`
105+
Start *timestamppb.Timestamp `json:"start"`
106+
End *timestamppb.Timestamp `json:"end"`
107+
MasterNode string `json:"master_node"`
108+
}
109+
110+
func GetFinishedJobs() ([]*SimpleJob, error) {
111+
// Use qacct NativeSpecification to get finished jobs
112+
qacctOutput, err := qacctClient.NativeSpecification([]string{"-j", "*"})
113+
if err != nil {
114+
return nil, fmt.Errorf("error running qacct command: %v", err)
115+
}
116+
117+
jobs, err := qacct.ParseQAcctJobOutput(qacctOutput)
118+
if err != nil {
119+
return nil, fmt.Errorf("error parsing qacct output: %v", err)
120+
}
121+
// convert to SimpleJob format
122+
simpleJobs := make([]*SimpleJob, len(jobs))
123+
for i, job := range jobs {
124+
state := fmt.Sprintf("%d", job.ExitStatus)
125+
if state == "0" {
126+
state = "done"
127+
} else {
128+
state = "failed"
129+
}
130+
simpleJobs[i] = &SimpleJob{
131+
// ignore job arrays for now
132+
JobId: fmt.Sprintf("%d", job.JobNumber),
133+
Cluster: job.QName,
134+
JobName: job.JobName,
135+
Partition: job.GrantedPE,
136+
Account: job.Account,
137+
User: job.Owner,
138+
State: state,
139+
ExitCode: fmt.Sprintf("%d", job.ExitStatus),
140+
Submit: parseTimestamp(job.QSubTime),
141+
Start: parseTimestamp(job.StartTime),
142+
End: parseTimestamp(job.EndTime),
143+
MasterNode: job.HostName,
144+
}
145+
}
146+
return simpleJobs, nil
147+
}
148+
149+
func GetRunningJobs() ([]*SimpleJob, error) {
150+
151+
qstatOverview, err := qstatClient.NativeSpecification([]string{"-g", "t"})
152+
if err != nil {
153+
return nil, fmt.Errorf("error running qstat command: %v", err)
154+
}
155+
jobsByTask, err := qstat.ParseGroupByTask(qstatOverview)
156+
if err != nil {
157+
return nil, fmt.Errorf("error parsing qstat output: %v", err)
158+
}
159+
160+
type State struct {
161+
QueueName string
162+
State string
163+
MasterNode string
164+
}
165+
166+
stateMap := map[int]State{}
167+
168+
for _, job := range jobsByTask {
169+
// we are only interested in serial and parallel jobs (no arrays)
170+
jq := strings.Split(job.Queue, "@")
171+
if len(jq) == 2 {
172+
js, exists := stateMap[job.JobID]
173+
if !exists {
174+
js = State{
175+
QueueName: jq[0],
176+
State: job.State,
177+
MasterNode: jq[1],
178+
}
179+
}
180+
if job.Master == "MASTER" {
181+
// this is the master task of a parallel job
182+
js.MasterNode = jq[1]
183+
stateMap[job.JobID] = js
184+
}
185+
continue
186+
}
187+
stateMap[job.JobID] = State{
188+
QueueName: job.Queue,
189+
State: job.State,
190+
}
191+
}
192+
193+
// get running jobs
194+
qstatOutput, err := qstatClient.NativeSpecification([]string{"-j", "*"})
195+
if err != nil {
196+
return nil, fmt.Errorf("error running qstat command: %v", err)
197+
}
198+
199+
jobs, err := qstat.ParseSchedulerJobInfo(qstatOutput)
200+
if err != nil {
201+
return nil, fmt.Errorf("error parsing qstat output: %v", err)
202+
}
203+
204+
// convert to SimpleJob format
205+
simpleJobs := make([]*SimpleJob, len(jobs))
206+
for i, job := range jobs {
207+
state := stateMap[job.JobNumber].State
208+
if state == "" {
209+
state = "running"
210+
}
211+
simpleJobs[i] = &SimpleJob{
212+
JobId: fmt.Sprintf("%d", job.JobNumber),
213+
Cluster: stateMap[job.JobNumber].QueueName,
214+
JobName: job.JobName,
215+
Partition: strings.Split(job.ParallelEnvironment, " ")[0], // PE
216+
Account: job.Account,
217+
User: job.Owner,
218+
State: state,
219+
ExitCode: "",
220+
MasterNode: stateMap[job.JobNumber].MasterNode,
221+
}
222+
if strings.Contains(stateMap[job.JobNumber].State, "q") {
223+
simpleJobs[i].Submit = parseTimestamp(job.SubmissionTime)
224+
} else {
225+
simpleJobs[i].Start = parseTimestamp(job.SubmissionTime)
226+
}
227+
}
228+
return simpleJobs, nil
229+
}
230+
231+
func SendJobs(ctx context.Context, jobs []*SimpleJob) (int, error) {
232+
log.Info("Sending jobs", zap.Int("jobs", len(jobs)))
233+
// Print the jobs
234+
for _, job := range jobs {
235+
// pretty print JSON
236+
json, err := json.MarshalIndent(job, "", " ")
237+
if err != nil {
238+
return 0, fmt.Errorf("error marshalling job: %v", err)
239+
}
240+
fmt.Println(string(json))
241+
}
242+
return len(jobs), nil
243+
}
244+
245+
// 2024-10-24 09:49:59.911136
246+
func parseTimestamp(s string) *timestamppb.Timestamp {
247+
loc, err := time.LoadLocation("Local")
248+
if err != nil {
249+
return nil
250+
}
251+
t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", s, loc)
252+
if err != nil {
253+
return nil
254+
}
255+
return timestamppb.New(t)
256+
}

0 commit comments

Comments
 (0)