Skip to content
This repository was archived by the owner on Mar 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[default.extend-words]
# This spelling error is caused in the mbaigo systems
Celcius = "Celcius"

110 changes: 110 additions & 0 deletions collector/collect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/sdoque/mbaigo/components"
"github.com/sdoque/mbaigo/forms"
"github.com/sdoque/mbaigo/usecases"
)

type mockTransport struct {
respCode int
respBody io.ReadCloser

// hits map[string]int
// returnError bool
// resp *http.Response
// err error
}

func newMockTransport() mockTransport {
t := mockTransport{
respCode: 200,
respBody: io.NopCloser(strings.NewReader("")),

// hits: make(map[string]int),
// err: err,
// returnError: retErr,
// resp: resp,
}
// Hijack the default http client so no actual http requests are sent over the network
http.DefaultClient.Transport = t
return t
}

func (t mockTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
// log.Println("HIJACK:", req.URL.String())
// t.hits[req.URL.Hostname()] += 1
// if t.err != nil {
// return nil, t.err
// }
// if t.returnError != false {
// req.GetBody = func() (io.ReadCloser, error) {
// return nil, errHTTP
// }
// }
// t.resp.Request = req
// return t.resp, nil

// b, err := io.ReadAll(req.Body)
// if err != nil {
// return
// }
// fmt.Println(string(b))

return &http.Response{
Request: req,
StatusCode: t.respCode,
Body: t.respBody,
}, nil
}

const mockBodyType string = "application/json"

var mockStates = map[string]string{
"temperature": `{ "value": 0, "unit": "Celcius", "timestamp": "%s", "version": "SignalA_v1.0" }`,
"SEKPrice": `{ "value": 0.10403, "unit": "SEK", "timestamp": "%s", "version": "SignalA_v1.0" }`,
"DesiredTemp": `{ "value": 25, "unit": "Celsius", "timestamp": "%s", "version": "SignalA_v1.0" }`,
"setpoint": `{ "value": 20, "unit": "Celsius", "timestamp": "%s", "version": "SignalA_v1.0" }`,
}

func mockGetState(c *components.Cervice, s *components.System) (f forms.Form, err error) {
if c == nil {
err = fmt.Errorf("got empty *Cervice instance")
return
}
b := mockStates[c.Name]
if len(b) < 1 {
err = fmt.Errorf("found no mock body for service: %s", c.Name)
return
}
body := fmt.Sprintf(b, time.Now().Format(time.RFC3339))
f, err = usecases.Unpack([]byte(body), mockBodyType)
if err != nil {
err = fmt.Errorf("failed to unpack mock body: %s", err)
}
return
}

func TestCollectService(t *testing.T) {
newMockTransport()
ua := newUnitAsset(*initTemplate(), newSystem(), nil)
ua.apiGetState = mockGetState

// for _, service := range consumeServices {
// err := ua.collectService(service)
// if err != nil {
// t.Fatalf("Expected nil error while pulling %s, got: %s", service, err)
// }
// }
err := ua.collectAllServices()
if err != nil {
t.Fatalf("Expected nil error, got: %s", err)
}
}
125 changes: 125 additions & 0 deletions collector/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"context"
"encoding/json"
"log"
"sync"

"github.com/sdoque/mbaigo/components"
"github.com/sdoque/mbaigo/usecases"
)

func main() {
sys := newSystem()
sys.loadConfiguration()

// Generate PKI keys and CSR to obtain a authentication certificate from the CA
usecases.RequestCertificate(&sys.System)

// Register the system and its services
// WARN: this func runs a goroutine of it's own, which makes it hard to count
// using the waitgroup (and I can't be arsed to do it properly...)
usecases.RegisterServices(&sys.System)

// Run forever
sys.listenAndServe()
}

////////////////////////////////////////////////////////////////////////////////

// There's no interface to use, so have to encapsulate the base struct instead
type system struct {
components.System

cancel func()
startups []func() error
}

func newSystem() (sys *system) {
// Handle graceful shutdowns using this context. It should always be canceled,
// no matter the final execution path so all computer resources are freed up.
ctx, cancel := context.WithCancel(context.Background())

// Create a new Eclipse Arrowhead application system and then wrap it with a
// "husk" (aka a wrapper or shell), which then sets up various properties and
// operations that's required of an Arrowhead system.
// var sys system
sys = &system{
System: components.NewSystem("Collector", ctx),
cancel: cancel,
}
sys.Husk = &components.Husk{
Description: "pulls data from other Arrorhead systems and sends it to a InfluxDB server.",
Details: map[string][]string{"Developer": {"Alex"}},
ProtoPort: map[string]int{"https": 6666, "http": 6666, "coap": 0},
InfoLink: "https://github.com/lmas/d0020e_code/tree/master/collector",
}
return
}

func (sys *system) loadConfiguration() {
// Try loading the config file (in JSON format) for this deployment,
// by using a unit asset with default values.
uat := components.UnitAsset(initTemplate())
sys.UAssets[uat.GetName()] = &uat
rawUAs, servsTemp, err := usecases.Configure(&sys.System)
// If the file is missing, a new config will be created and an error is returned here.
if err != nil {
// TODO: it would had been nice to catch the exact error for "created config.."
// and not display it as an actual error, per se.
log.Fatalf("Error while reading configuration: %v\n", err)
}

// Load the proper unit asset(s) using the user-defined settings from the config file.
clear(sys.UAssets)
for _, raw := range rawUAs {
var uac unitAsset
if err := json.Unmarshal(raw, &uac); err != nil {
log.Fatalf("Error while unmarshalling configuration: %+v\n", err)
}
// ua, startup := newUnitAsset(uac, &sys.System, servsTemp)
// ua := newUnitAsset(uac, &sys.System, servsTemp)
ua := newUnitAsset(uac, sys, servsTemp)
sys.startups = append(sys.startups, ua.startup)
intf := components.UnitAsset(ua)
sys.UAssets[ua.GetName()] = &intf
}
}

func (sys *system) listenAndServe() {
var wg sync.WaitGroup // Used for counting all started goroutines

// start a web server that serves basic documentation of the system
wg.Add(1)
go func() {
if err := usecases.SetoutServers(&sys.System); err != nil {
log.Println("Error while running web server:", err)
sys.cancel()
}
wg.Done()
}()

// Run all the startups in separate goroutines and keep track of them
for _, f := range sys.startups {
wg.Add(1)
go func(start func() error) {
if err := start(); err != nil {
log.Printf("Error while running collector: %s\n", err)
sys.cancel()
}
wg.Done()
}(f)
}

// Block and wait for either a...
select {
case <-sys.Sigs: // user initiated shutdown signal (ctrl+c) or a...
case <-sys.Ctx.Done(): // shutdown request from a worker
}

// Gracefully terminate any leftover goroutines and wait for them to shutdown properly
log.Println("Initiated shutdown, waiting for workers to terminate")
sys.cancel()
wg.Wait()
}
Loading