Skip to content

Commit 5e79ca0

Browse files
committed
common: introduce ServiceState
1 parent 7915834 commit 5e79ca0

File tree

18 files changed

+157
-104
lines changed

18 files changed

+157
-104
lines changed

common/service.go

+35-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"net"
2323
"strings"
24+
"sync/atomic"
2425
)
2526

2627
// ServiceType describes the service type (analyzer or agent)
@@ -35,9 +36,42 @@ const (
3536
AgentService ServiceType = "agent"
3637
)
3738

39+
// ServiceState describes the state of a service.
40+
type ServiceState int64
41+
42+
// MarshalJSON marshal the connection state to JSON
43+
func (s *ServiceState) MarshalJSON() ([]byte, error) {
44+
switch *s {
45+
case StartingState:
46+
return []byte("\"starting\""), nil
47+
case RunningState:
48+
return []byte("\"running\""), nil
49+
case StoppingState:
50+
return []byte("\"stopping\""), nil
51+
case StoppedState:
52+
return []byte("\"stopped\""), nil
53+
}
54+
return nil, fmt.Errorf("Invalid state: %d", s)
55+
}
56+
57+
// Store atomatically stores the state
58+
func (s *ServiceState) Store(state ServiceState) {
59+
atomic.StoreInt64((*int64)(s), int64(state))
60+
}
61+
62+
// Load atomatically loads and returns the state
63+
func (s *ServiceState) Load() ServiceState {
64+
return ServiceState(atomic.LoadInt64((*int64)(s)))
65+
}
66+
67+
// CompareAndSwap executes the compare-and-swap operation for a state
68+
func (s *ServiceState) CompareAndSwap(old, new ServiceState) bool {
69+
return atomic.CompareAndSwapInt64((*int64)(s), int64(old), int64(new))
70+
}
71+
3872
const (
3973
// StoppedState service stopped
40-
StoppedState = iota + 1
74+
StoppedState ServiceState = iota + 1
4175
// StartingState service starting
4276
StartingState
4377
// RunningState service running

etcd/election.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package etcd
1919

2020
import (
2121
"sync"
22-
"sync/atomic"
2322
"time"
2423

2524
etcd "github.com/coreos/etcd/client"
@@ -42,7 +41,7 @@ type MasterElector struct {
4241
listeners []common.MasterElectionListener
4342
cancel context.CancelFunc
4443
master bool
45-
state int64
44+
state common.ServiceState
4645
wg sync.WaitGroup
4746
}
4847

@@ -131,8 +130,8 @@ func (le *MasterElector) start(first chan struct{}) {
131130
le.wg.Add(1)
132131
defer le.wg.Done()
133132

134-
atomic.StoreInt64(&le.state, common.RunningState)
135-
for atomic.LoadInt64(&le.state) == common.RunningState {
133+
le.state.Store(common.RunningState)
134+
for le.state.Load() == common.RunningState {
136135
resp, err := watcher.Next(ctx)
137136
if err != nil {
138137
logging.GetLogger().Errorf("Error while watching etcd: %s", err.Error())
@@ -191,7 +190,7 @@ func (le *MasterElector) StartAndWait() {
191190

192191
// Stop the election mechanism
193192
func (le *MasterElector) Stop() {
194-
if atomic.CompareAndSwapInt64(&le.state, common.RunningState, common.StoppingState) {
193+
if le.state.CompareAndSwap(common.RunningState, common.StoppingState) {
195194
le.cancel()
196195
le.wg.Wait()
197196
}

flow/pcap.go

+8-9
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package flow
2020
import (
2121
"io"
2222
"sync"
23-
"sync/atomic"
2423
"time"
2524

2625
"github.com/google/gopacket"
@@ -39,7 +38,7 @@ type PcapWriter struct {
3938
// PcapTableFeeder replaies a pcap file
4039
type PcapTableFeeder struct {
4140
sync.WaitGroup
42-
state int64
41+
state common.ServiceState
4342
replay bool
4443
r io.ReadCloser
4544
handleRead *pcapgo.Reader
@@ -49,19 +48,19 @@ type PcapTableFeeder struct {
4948

5049
// Start a pcap injector
5150
func (p *PcapTableFeeder) Start() {
52-
if atomic.CompareAndSwapInt64(&p.state, common.StoppedState, common.RunningState) {
51+
if p.state.CompareAndSwap(common.StoppedState, common.RunningState) {
5352
p.Add(1)
5453
go p.feedFlowTable()
5554
}
5655
}
5756

5857
// Stop a pcap injector
5958
func (p *PcapTableFeeder) Stop() {
60-
if atomic.CompareAndSwapInt64(&p.state, common.RunningState, common.StoppingState) {
61-
atomic.StoreInt64(&p.state, common.StoppingState)
59+
if p.state.CompareAndSwap(common.RunningState, common.StoppingState) {
60+
p.state.Store(common.StoppingState)
6261
p.r.Close()
6362
p.Wait()
64-
atomic.StoreInt64(&p.state, common.StoppedState)
63+
p.state.Store(common.StoppedState)
6564
}
6665
}
6766

@@ -81,12 +80,12 @@ func (p *PcapTableFeeder) feedFlowTable() {
8180
logging.GetLogger().Error(err.Error())
8281
}
8382

84-
atomic.StoreInt64(&p.state, common.RunningState)
85-
for atomic.LoadInt64(&p.state) == common.RunningState {
83+
p.state.Store(common.RunningState)
84+
for p.state.Load() == common.RunningState {
8685
logging.GetLogger().Debugf("Reading one pcap packet")
8786
data, ci, err := p.handleRead.ReadPacketData()
8887
if err != nil {
89-
if atomic.LoadInt64(&p.state) == common.RunningState && err != io.EOF {
88+
if p.state.Load() == common.RunningState && err != io.EOF {
9089
logging.GetLogger().Warningf("Failed to read packet: %s\n", err)
9190
}
9291
p.r.Close()

flow/probes/gopacket.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"fmt"
2424
"io"
2525
"sync"
26-
"sync/atomic"
2726
"time"
2827

2928
"github.com/google/gopacket"
@@ -59,7 +58,7 @@ type GoPacketProbe struct {
5958
Ctx Context
6059
n *graph.Node
6160
packetProbe PacketProbe
62-
state int64
61+
state common.ServiceState
6362
ifName string
6463
bpfFilter string
6564
nsPath string
@@ -88,7 +87,7 @@ func (p *GoPacketProbe) updateStats(g *graph.Graph, n *graph.Node, captureStats
8887
case <-ticker.C:
8988
if stats, err := p.packetProbe.Stats(); err != nil {
9089
p.Ctx.Logger.Error(err)
91-
} else if atomic.LoadInt64(&p.state) == common.RunningState {
90+
} else if p.state.Load() == common.RunningState {
9291
g.Lock()
9392
g.UpdateMetadata(n, "Captures", func(obj interface{}) bool {
9493
captureStats.PacketsDropped = stats.PacketsDropped
@@ -108,7 +107,7 @@ func (p *GoPacketProbe) listen(packetCallback func(gopacket.Packet)) error {
108107
packetSource := p.packetProbe.PacketSource()
109108

110109
var errs int
111-
for atomic.LoadInt64(&p.state) == common.RunningState {
110+
for p.state.Load() == common.RunningState {
112111
packet, err := packetSource.NextPacket()
113112
switch err {
114113
case nil:
@@ -139,7 +138,7 @@ func (p *GoPacketProbe) listen(packetCallback func(gopacket.Packet)) error {
139138
// Run starts capturing packet, calling the passed callback for every packet
140139
// and notifying the flow probe handler when the capture has started
141140
func (p *GoPacketProbe) Run(packetCallback func(gopacket.Packet), e ProbeEventHandler) error {
142-
atomic.StoreInt64(&p.state, common.RunningState)
141+
p.state.Store(common.RunningState)
143142

144143
var nsContext *common.NetNSContext
145144
var err error
@@ -199,14 +198,14 @@ func (p *GoPacketProbe) Run(packetCallback func(gopacket.Packet), e ProbeEventHa
199198
statsTicker.Stop()
200199

201200
p.packetProbe.Close()
202-
atomic.StoreInt64(&p.state, common.StoppedState)
201+
p.state.Store(common.StoppedState)
203202

204203
return err
205204
}
206205

207206
// Stop capturing packets
208207
func (p *GoPacketProbe) Stop() {
209-
atomic.StoreInt64(&p.state, common.StoppingState)
208+
p.state.Store(common.StoppingState)
210209
}
211210

212211
// NewGoPacketProbe returns a new Gopacket flow probe. It can use either `pcap` or `afpacket`

flow/probes/pcapsocket.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"fmt"
2222
"net"
2323
"sync"
24-
"sync/atomic"
2524

2625
"github.com/skydive-project/skydive/api/types"
2726
"github.com/skydive-project/skydive/common"
@@ -33,7 +32,7 @@ import (
3332
// PcapSocketProbe describes a TCP packet listener that inject packets in a flowtable
3433
type PcapSocketProbe struct {
3534
Ctx Context
36-
state int64
35+
state common.ServiceState
3736
flowTable *flow.Table
3837
listener *net.TCPListener
3938
port int
@@ -49,15 +48,15 @@ type PcapSocketProbeHandler struct {
4948
}
5049

5150
func (p *PcapSocketProbe) run() {
52-
atomic.StoreInt64(&p.state, common.RunningState)
51+
p.state.Store(common.RunningState)
5352

5453
packetSeqChan, _, _ := p.flowTable.Start()
5554
defer p.flowTable.Stop()
5655

57-
for atomic.LoadInt64(&p.state) == common.RunningState {
56+
for p.state.Load() == common.RunningState {
5857
conn, err := p.listener.Accept()
5958
if err != nil {
60-
if atomic.LoadInt64(&p.state) == common.RunningState {
59+
if p.state.Load() == common.RunningState {
6160
p.Ctx.Logger.Errorf("Error while accepting connection: %s", err)
6261
}
6362
break
@@ -132,7 +131,7 @@ func (p *PcapSocketProbeHandler) UnregisterProbe(n *graph.Node, e ProbeEventHand
132131

133132
p.Ctx.FTA.Release(probe.flowTable)
134133

135-
atomic.StoreInt64(&probe.state, common.StoppingState)
134+
probe.state.Store(common.StoppingState)
136135
probe.listener.Close()
137136

138137
p.portAllocator.Release(probe.port)

flow/server/flow_server.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"strconv"
2525
"strings"
2626
"sync"
27-
"sync/atomic"
2827
"time"
2928

3029
"github.com/skydive-project/skydive/common"
@@ -84,7 +83,7 @@ type FlowServerWebSocketConn struct {
8483
type FlowServer struct {
8584
storage storage.Storage
8685
conn FlowServerConn
87-
state int64
86+
state common.ServiceState
8887
wgServer sync.WaitGroup
8988
bulkInsert int
9089
bulkInsertDeadline time.Duration
@@ -224,7 +223,7 @@ func (s *FlowServer) storeFlows(flows []*flow.Flow) {
224223

225224
// Start the flow server
226225
func (s *FlowServer) Start() {
227-
atomic.StoreInt64(&s.state, common.RunningState)
226+
s.state.Store(common.RunningState)
228227
s.wgServer.Add(1)
229228

230229
s.conn.Serve(s.ch, s.quit, &s.wgServer)
@@ -257,7 +256,7 @@ func (s *FlowServer) Start() {
257256

258257
// Stop the server
259258
func (s *FlowServer) Stop() {
260-
if atomic.CompareAndSwapInt64(&s.state, common.RunningState, common.StoppingState) {
259+
if s.state.CompareAndSwap(common.RunningState, common.StoppingState) {
261260
s.quit <- struct{}{}
262261
s.quit <- struct{}{}
263262
s.wgServer.Wait()

flow/table.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package flow
2020
import (
2121
"strings"
2222
"sync"
23-
"sync/atomic"
2423
"time"
2524

2625
"github.com/google/gopacket"
@@ -63,7 +62,7 @@ type Table struct {
6362
flushDone chan bool
6463
query chan *TableQuery
6564
reply chan []byte
66-
state int64
65+
state common.ServiceState
6766
lockState common.RWMutex
6867
wg sync.WaitGroup
6968
quit chan bool
@@ -348,7 +347,7 @@ func (ft *Table) Query(query *TableQuery) []byte {
348347
ft.lockState.Lock()
349348
defer ft.lockState.Unlock()
350349

351-
if atomic.LoadInt64(&ft.state) == common.RunningState {
350+
if ft.state.Load() == common.RunningState {
352351
ft.query <- query
353352

354353
timer := time.NewTicker(1 * time.Second)
@@ -359,7 +358,7 @@ func (ft *Table) Query(query *TableQuery) []byte {
359358
case r := <-ft.reply:
360359
return r
361360
case <-timer.C:
362-
if atomic.LoadInt64(&ft.state) != common.RunningState {
361+
if ft.state.Load() != common.RunningState {
363362
return nil
364363
}
365364
}
@@ -478,8 +477,8 @@ func (ft *Table) processEBPFFlow(ebpfFlow *EBPFFlow) {
478477
}
479478

480479
// State returns the state of the flow table, stopped, running...
481-
func (ft *Table) State() int64 {
482-
return atomic.LoadInt64(&ft.state)
480+
func (ft *Table) State() common.ServiceState {
481+
return ft.state.Load()
483482
}
484483

485484
// Run background jobs, like update/expire entries event
@@ -508,7 +507,7 @@ func (ft *Table) Run() {
508507
ft.query = make(chan *TableQuery, 100)
509508
ft.reply = make(chan []byte, 100)
510509

511-
atomic.StoreInt64(&ft.state, common.RunningState)
510+
ft.state.Store(common.RunningState)
512511
for {
513512
select {
514513
case <-ft.quit:
@@ -584,7 +583,7 @@ func (ft *Table) Stop() {
584583
ft.lockState.Lock()
585584
defer ft.lockState.Unlock()
586585

587-
if atomic.CompareAndSwapInt64(&ft.state, common.RunningState, common.StoppingState) {
586+
if ft.state.CompareAndSwap(common.RunningState, common.StoppingState) {
588587
ft.quit <- true
589588
ft.wg.Wait()
590589

probe/bundle.go

+10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ type Handler interface {
3232
Stop()
3333
}
3434

35+
// ServiceStatus describes the status returned by GetStatus
36+
type ServiceStatus struct {
37+
Status common.ServiceState
38+
}
39+
40+
// StatusReporter can be implemented by probes to report their status
41+
type StatusReporter interface {
42+
GetStatus() interface{}
43+
}
44+
3545
// Bundle describes a bundle of probes (topology of flow)
3646
type Bundle struct {
3747
common.RWMutex

tests/scale_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func TestScaleHA(t *testing.T) {
312312
gh := gclient.NewGremlinQueryHelper(authOptions)
313313

314314
// expected 1 either Incomer or Outgoer
315-
if err = common.Retry(func() error { return checkPeers(client, 1, common.RunningState) }, 5, time.Second); err != nil {
315+
if err = common.Retry(func() error { return checkPeers(client, 1, websocket.ConnState(common.RunningState)) }, 5, time.Second); err != nil {
316316
execCmds(t, tearDownCmds...)
317317
t.Fatal(err)
318318
}
@@ -405,7 +405,7 @@ func TestScaleHA(t *testing.T) {
405405
}
406406
execCmds(t, setupCmds...)
407407

408-
if err = checkPeers(client, 0, common.RunningState); err != nil {
408+
if err = checkPeers(client, 0, websocket.ConnState(common.RunningState)); err != nil {
409409
execCmds(t, tearDownCmds...)
410410
t.Fatal(err)
411411
}
@@ -453,7 +453,7 @@ func TestScaleHA(t *testing.T) {
453453
execCmds(t, setupCmds...)
454454

455455
if err = common.Retry(func() error {
456-
return checkPeers(client, 1, common.RunningState)
456+
return checkPeers(client, 1, websocket.ConnState(common.RunningState))
457457
}, 15, time.Second); err != nil {
458458
execCmds(t, tearDownCmds...)
459459
t.Fatal(err)

0 commit comments

Comments
 (0)