Skip to content

Commit 0d1c0a1

Browse files
Add runner periodic cleanup check
Adds a periodic cleanup function that cross checks runners between github, the provider and the GARM database. If an inconsistency is found, GARM will attempt to fix it. Signed-off-by: Gabriel Adrian Samfira <[email protected]>
1 parent 808017b commit 0d1c0a1

File tree

12 files changed

+479
-43
lines changed

12 files changed

+479
-43
lines changed

database/watcher/filters.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,10 @@ func WithScaleSetInstanceFilter(scaleset params.ScaleSet) dbCommon.PayloadFilter
255255
}
256256

257257
instance, ok := payload.Payload.(params.Instance)
258-
if !ok {
258+
if !ok || instance.ScaleSetID == 0 {
259259
return false
260260
}
261+
261262
return instance.ScaleSetID == scaleset.ID
262263
}
263264
}

locking/local_locker.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,19 @@ var _ Locker = &keyMutex{}
3333

3434
func (k *keyMutex) TryLock(key, identifier string) bool {
3535
mux, _ := k.muxes.LoadOrStore(key, &lockWithIdent{
36-
mux: sync.Mutex{},
37-
ident: identifier,
36+
mux: sync.Mutex{},
3837
})
3938
keyMux := mux.(*lockWithIdent)
39+
keyMux.ident = identifier
4040
return keyMux.mux.TryLock()
4141
}
4242

4343
func (k *keyMutex) Lock(key, identifier string) {
4444
mux, _ := k.muxes.LoadOrStore(key, &lockWithIdent{
45-
mux: sync.Mutex{},
46-
ident: identifier,
45+
mux: sync.Mutex{},
4746
})
4847
keyMux := mux.(*lockWithIdent)
48+
keyMux.ident = identifier
4949
keyMux.mux.Lock()
5050
}
5151

@@ -60,6 +60,7 @@ func (k *keyMutex) Unlock(key string, remove bool) {
6060
}
6161
_, filename, line, _ := runtime.Caller(1)
6262
slog.Debug("unlocking", "key", key, "identifier", keyMux.ident, "caller", fmt.Sprintf("%s:%d", filename, line))
63+
keyMux.ident = ""
6364
keyMux.mux.Unlock()
6465
}
6566

params/github.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) {
421421
type RunnerReference struct {
422422
ID int64 `json:"id"`
423423
Name string `json:"name"`
424+
OS string `json:"os"`
424425
RunnerScaleSetID int `json:"runnerScaleSetId"`
425426
CreatedOn interface{} `json:"createdOn"`
426427
RunnerGroupID uint64 `json:"runnerGroupId"`
@@ -431,9 +432,29 @@ type RunnerReference struct {
431432
Status interface{} `json:"status"`
432433
DisableUpdate bool `json:"disableUpdate"`
433434
ProvisioningState string `json:"provisioningState"`
435+
Busy bool `json:"busy"`
434436
Labels []Label `json:"labels,omitempty"`
435437
}
436438

439+
func (r RunnerReference) GetStatus() RunnerStatus {
440+
status, ok := r.Status.(string)
441+
if !ok {
442+
return RunnerUnknown
443+
}
444+
runnerStatus := RunnerStatus(status)
445+
if !runnerStatus.IsValid() {
446+
return RunnerUnknown
447+
}
448+
449+
if runnerStatus == RunnerOnline {
450+
if r.Busy {
451+
return RunnerActive
452+
}
453+
return RunnerIdle
454+
}
455+
return runnerStatus
456+
}
457+
437458
type RunnerScaleSetJitRunnerConfig struct {
438459
Runner *RunnerReference `json:"runner"`
439460
EncodedJITConfig string `json:"encodedJITConfig"`

params/params.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,18 @@ type (
4949
ScaleSetMessageType string
5050
)
5151

52+
func (s RunnerStatus) IsValid() bool {
53+
switch s {
54+
case RunnerIdle, RunnerPending, RunnerTerminated,
55+
RunnerInstalling, RunnerFailed,
56+
RunnerActive, RunnerOffline,
57+
RunnerUnknown, RunnerOnline:
58+
59+
return true
60+
}
61+
return false
62+
}
63+
5264
const (
5365
// PoolBalancerTypeRoundRobin will try to cycle through the pools of an entity
5466
// in a round robin fashion. For example, if a repository has multiple pools that
@@ -117,6 +129,9 @@ const (
117129
RunnerInstalling RunnerStatus = "installing"
118130
RunnerFailed RunnerStatus = "failed"
119131
RunnerActive RunnerStatus = "active"
132+
RunnerOffline RunnerStatus = "offline"
133+
RunnerOnline RunnerStatus = "online"
134+
RunnerUnknown RunnerStatus = "unknown"
120135
)
121136

122137
const (

runner/common/util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ type GithubEntityOperations interface {
2828
GithubBaseURL() *url.URL
2929
}
3030

31+
type RateLimitClient interface {
32+
RateLimit(ctx context.Context) (*github.RateLimits, error)
33+
}
34+
3135
// GithubClient that describes the minimum list of functions we need to interact with github.
3236
// Allows for easier testing.
3337
//

runner/pool/pool.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,10 +439,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
439439
// github so we let them be for now.
440440
continue
441441
}
442+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
443+
if err != nil {
444+
return errors.Wrap(err, "fetching instance pool info")
445+
}
442446

443447
switch instance.RunnerStatus {
444448
case params.RunnerPending, params.RunnerInstalling:
445-
if time.Since(instance.UpdatedAt).Minutes() < float64(instance.RunnerTimeout()) {
449+
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
446450
// runner is still installing. We give it a chance to finish.
447451
slog.DebugContext(
448452
r.ctx, "runner is still installing, give it a chance to finish",
@@ -510,7 +514,11 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
510514
}
511515
defer locking.Unlock(instance.Name, false)
512516

513-
if time.Since(instance.UpdatedAt).Minutes() < float64(instance.RunnerTimeout()) {
517+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
518+
if err != nil {
519+
return errors.Wrap(err, "fetching instance pool info")
520+
}
521+
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
514522
continue
515523
}
516524

util/github/client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,30 @@ func (g *githubClient) GithubBaseURL() *url.URL {
490490
return g.cli.BaseURL
491491
}
492492

493+
func NewRateLimitClient(ctx context.Context, credentials params.GithubCredentials) (common.RateLimitClient, error) {
494+
httpClient, err := credentials.GetHTTPClient(ctx)
495+
if err != nil {
496+
return nil, errors.Wrap(err, "fetching http client")
497+
}
498+
499+
slog.DebugContext(
500+
ctx, "creating rate limit client",
501+
"base_url", credentials.APIBaseURL,
502+
"upload_url", credentials.UploadBaseURL)
503+
504+
ghClient, err := github.NewClient(httpClient).WithEnterpriseURLs(
505+
credentials.APIBaseURL, credentials.UploadBaseURL)
506+
if err != nil {
507+
return nil, errors.Wrap(err, "fetching github client")
508+
}
509+
cli := &githubClient{
510+
rateLimit: ghClient.RateLimit,
511+
cli: ghClient,
512+
}
513+
514+
return cli, nil
515+
}
516+
493517
func Client(ctx context.Context, entity params.GithubEntity) (common.GithubClient, error) {
494518
// func GithubClient(ctx context.Context, entity params.GithubEntity) (common.GithubClient, error) {
495519
httpClient, err := entity.Credentials.GetHTTPClient(ctx)

util/github/scalesets/message_sessions.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,17 @@ func (m *MessageSession) maybeRefreshToken(ctx context.Context) error {
141141
if m.session == nil {
142142
return fmt.Errorf("session is nil")
143143
}
144-
// add some jitter
145-
randInt, err := rand.Int(rand.Reader, big.NewInt(5000))
146-
if err != nil {
147-
return fmt.Errorf("failed to get a random number")
148-
}
144+
149145
expiresAt, err := m.session.ExiresAt()
150146
if err != nil {
151147
return fmt.Errorf("failed to get expires at: %w", err)
152148
}
153-
expiresIn := time.Duration(randInt.Int64())*time.Millisecond + 10*time.Minute
149+
// add some jitter (30 second interval)
150+
randInt, err := rand.Int(rand.Reader, big.NewInt(30))
151+
if err != nil {
152+
return fmt.Errorf("failed to get a random number")
153+
}
154+
expiresIn := time.Duration(randInt.Int64())*time.Second + 10*time.Minute
154155
slog.DebugContext(ctx, "checking if message session token needs refresh", "expires_at", expiresAt)
155156
if m.session.ExpiresIn(expiresIn) {
156157
if err := m.Refresh(ctx); err != nil {

workers/entity/worker_watcher.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,28 @@ func (w *Worker) handleWorkerWatcherEvent(event dbCommon.ChangePayload) {
1313
entityType := dbCommon.DatabaseEntityType(w.Entity.EntityType)
1414
switch event.EntityType {
1515
case entityType:
16-
entityGetter, ok := event.Payload.(params.EntityGetter)
17-
if !ok {
18-
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
19-
return
20-
}
21-
entity, err := entityGetter.GetEntity()
22-
if err != nil {
23-
slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
24-
return
25-
}
26-
w.handleEntityEventPayload(entity, event)
16+
w.handleEntityEventPayload(event)
2717
return
2818
case dbCommon.GithubCredentialsEntityType:
2919
slog.DebugContext(w.ctx, "got github credentials payload event")
30-
credentials, ok := event.Payload.(params.GithubCredentials)
31-
if !ok {
32-
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
33-
return
34-
}
35-
w.handleEntityCredentialsEventPayload(credentials, event)
20+
w.handleEntityCredentialsEventPayload(event)
3621
default:
3722
slog.DebugContext(w.ctx, "invalid entity type; ignoring", "entity_type", event.EntityType)
3823
}
3924
}
4025

41-
func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCommon.ChangePayload) {
26+
func (w *Worker) handleEntityEventPayload(event dbCommon.ChangePayload) {
27+
entityGetter, ok := event.Payload.(params.EntityGetter)
28+
if !ok {
29+
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
30+
return
31+
}
32+
entity, err := entityGetter.GetEntity()
33+
if err != nil {
34+
slog.ErrorContext(w.ctx, "getting entity from repository", "entity_type", event.EntityType, "payload", event.Payload, "error", err)
35+
return
36+
}
37+
4238
switch event.Operation {
4339
case dbCommon.UpdateOperation:
4440
slog.DebugContext(w.ctx, "got update operation")
@@ -57,7 +53,13 @@ func (w *Worker) handleEntityEventPayload(entity params.GithubEntity, event dbCo
5753
}
5854
}
5955

60-
func (w *Worker) handleEntityCredentialsEventPayload(credentials params.GithubCredentials, event dbCommon.ChangePayload) {
56+
func (w *Worker) handleEntityCredentialsEventPayload(event dbCommon.ChangePayload) {
57+
credentials, ok := event.Payload.(params.GithubCredentials)
58+
if !ok {
59+
slog.ErrorContext(w.ctx, "invalid payload for entity type", "entity_type", event.EntityType, "payload", event.Payload)
60+
return
61+
}
62+
6163
switch event.Operation {
6264
case dbCommon.UpdateOperation:
6365
slog.DebugContext(w.ctx, "got delete operation")

0 commit comments

Comments
 (0)