Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@

__pycache__
.hypothesis
*.log
*.db
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go 1.22
toolchain go1.23.3

require (
github.com/aws/aws-sdk-go v1.55.5
github.com/google/go-containerregistry v0.20.2
github.com/mattn/go-sqlite3 v1.14.24
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -25,6 +27,7 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
Comment on lines +5 to +6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should use sdk v2

github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
Expand Down Expand Up @@ -43,6 +45,10 @@ github.com/google/go-containerregistry v0.20.2 h1:B1wPJ1SN/S7pB+ZAimcciVD+r+yV/l
github.com/google/go-containerregistry v0.20.2/go.mod h1:z38EKdKh4h7IP2gSfUUqEvalZBqs6AoLeWfUy34nQC8=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
Expand All @@ -51,6 +57,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
Expand Down Expand Up @@ -148,6 +156,8 @@ google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
84 changes: 84 additions & 0 deletions reefagent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package reefagent

import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
)

type Agent struct {
Id string
Queue string
Job *Job
ServiceHost string
}

func (a *Agent) Start() {
// start a loop to ping the server every 1 second on a goroutine until ping returns a job
for {
success, job := a.Ping()
if success {
a.AcquireAndRunJob(job)
return
}
time.Sleep(1 * time.Second)
}
}

func (a *Agent) Ping() (bool, *Job) {
// call POST /ping endpoint with agent ID and queue
url := fmt.Sprintf("%s/ping?agentId=%s&queue=%s", a.ServiceHost, a.Id, a.Queue)
resp, err := http.Get(url)

if err != nil {
log.Fatal(err)
}
var response struct {
JobId string `json:"jobId"`
Commands []string `json:"commands"`
}
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
log.Fatal(err)
}
job := &Job{
Id: response.JobId,
Commands: response.Commands,
}
defer resp.Body.Close()
// if response is 200, return the job and true
// otherwise return nil and false
if resp.StatusCode == 200 {
return true, job
}
return false, nil
}

func (a *Agent) AcquireAndRunJob(job *Job) {
success := a.AcquireJob(job.Id)
if !success {
fmt.Println("Failed to acquire job")
return
}
a.Job = job
a.RunJob()
}

func (a *Agent) RunJob() {
jr := NewJobRunner(a.Job, a.ServiceHost)
jr.Run()
}

func (a *Agent) AcquireJob(jobId string) bool {
// Send POST request to acquire the job
url := fmt.Sprintf("%s/job/acquire?jobId=%s&agentId=%s", a.ServiceHost, jobId, a.Id)
resp, err := http.Post(url, "application/json", nil)
if err != nil {
log.Printf("Error acquiring job: %v", err)
return false
}
defer resp.Body.Close()

return resp.StatusCode == 200
}
6 changes: 6 additions & 0 deletions reefagent/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package reefagent

type Job struct {
Id string
Commands []string
}
37 changes: 37 additions & 0 deletions reefagent/job_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package reefagent

import (
"fmt"
"io"
"os"
"os/exec"
"strings"
)

type JobRunner struct {
logStreamer *LogStreamer
job *Job
}

func (jr *JobRunner) Run() {
jr.logStreamer.Start()
for _, command := range jr.job.Commands {
parts := strings.Split(command, " ")
cmd := exec.Command(parts[0], parts[1:]...)
cmd.Stdout = jr.logStreamer.logsWriter
cmd.Stderr = jr.logStreamer.logsWriter
if err := cmd.Run(); err != nil {
fmt.Printf("Error running command %s: %v\n", command, err)
}
}
jr.logStreamer.Stop()
}

func NewJobRunner(job *Job, serviceHost string) *JobRunner {
jr := &JobRunner{
job: job,
logStreamer: NewLogStreamer(job.Id, serviceHost),
}
jr.logStreamer.logsWriter = io.MultiWriter(jr.logStreamer.logs, os.Stdout)
return jr
}
149 changes: 149 additions & 0 deletions reefagent/log_streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package reefagent

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
)

type LogChunk struct {
Data []byte
Sequence int
}

type Buffer struct {
buf []byte
}

func (l *Buffer) Write(b []byte) (int, error) {
l.buf = append(l.buf, b...)
return len(b), nil
}

func (l *Buffer) ReadAndFlush() []byte {
buf := l.buf
l.buf = []byte{}
return buf
}

type LogStreamer struct {
jobId string
logsWriter io.Writer
logs *Buffer
logOrder int
queue chan LogChunk
maxSize int
active bool
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
serviceHost string
}

func NewLogStreamer(jobId string) *LogStreamer {
ctx, cancel := context.WithCancel(context.Background())
return &LogStreamer{
jobId: jobId,
logs: &Buffer{},
logOrder: 0,
maxSize: 10 * 1024 * 1024, // 10MB
active: false,
queue: make(chan LogChunk, 100),
ctx: ctx,
cancel: cancel,
}
}

func (ls *LogStreamer) Start() {
ls.active = true
ls.wg.Add(2)
go ls.StreamLogs()
go ls.RetrieveAndUploadChunk()
}

func (ls *LogStreamer) Stop() {
ls.ChunkLogs(ls.logs.ReadAndFlush())
ls.cancel()
ls.wg.Wait()
close(ls.queue)
}

// StreamLogs streams the result from the logs and chunks them into the queue
func (ls *LogStreamer) StreamLogs() {
defer ls.wg.Done()
for {
// read the logs and chunk them then push into the queue
logs := ls.logs.ReadAndFlush()
if len(logs) > 0 {
ls.ChunkLogs(logs)
}
select {
case <-time.After(1 * time.Second):
case <-ls.ctx.Done():
return
}
}
}

func (ls *LogStreamer) ChunkLogs(data []byte) {
chunkSize := ls.maxSize
for i := 0; i < len(data); i += chunkSize {
chunkData := data[i:min(i+chunkSize, len(data))]
logChunk := LogChunk{Data: chunkData, Sequence: ls.logOrder}
ls.queue <- logChunk
ls.logOrder++
}
}

// function that takes chunk from the queue and write it to file
func (ls *LogStreamer) RetrieveAndUploadChunk() {
defer ls.wg.Done()
for {
select {
case chunk, ok := <-ls.queue:
if !ok {
fmt.Println("Queue closed.. exiting")
return
}
// write the chunk to file
ls.WriteToFile(chunk)
// send the chunk to server
ls.UploadChunk(chunk)
case <-ls.ctx.Done():
return
}
}
}

func (ls *LogStreamer) WriteToFile(logChunk LogChunk) {
fileName := fmt.Sprintf("logs/%s-%d.log", ls.jobId, logChunk.Sequence)
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
fmt.Println("Error writing to file", err)
return
}
defer file.Close()
file.Write(logChunk.Data)
}

func (ls *LogStreamer) UploadChunk(logChunk LogChunk) {
// send the chunk to server
url := fmt.Sprintf("%s/job/logs?jobId=%s&sequence=%d", ls.serviceHost, ls.jobId, logChunk.Sequence)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(logChunk.Data))
if err != nil {
fmt.Println("Error creating request", err)
return
}
req.Header.Set("Content-Type", "text/plain")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Println("Error uploading chunk", err)
}
defer resp.Body.Close()
}
Loading