Skip to content

Commit 779ea65

Browse files
Fix: monitor
1 parent 99d792c commit 779ea65

File tree

2 files changed

+88
-48
lines changed

2 files changed

+88
-48
lines changed

node/data.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ func (a *Applied) UnmarshalJSON(data []byte) error {
4242

4343
// Failed -
4444
type Failed struct {
45-
Hash string
45+
Hash string `json:"hash,omitempty"`
4646
Protocol string `json:"protocol"`
4747
Branch string `json:"branch"`
4848
Contents []Content `json:"contents"`
49-
Signature string `json:"signature"`
49+
Signature string `json:"signature,omitempty"`
5050
Error stdJSON.RawMessage `json:"error,omitempty"`
5151
Raw stdJSON.RawMessage `json:"raw"`
5252
}
@@ -71,6 +71,19 @@ func (f *Failed) UnmarshalJSON(data []byte) error {
7171
return nil
7272
}
7373

74+
// FailedMonitor -
75+
type FailedMonitor Failed
76+
77+
// UnmarshalJSON -
78+
func (f *FailedMonitor) UnmarshalJSON(data []byte) error {
79+
type buf FailedMonitor
80+
if err := json.Unmarshal(data, (*buf)(f)); err != nil {
81+
return err
82+
}
83+
f.Raw = data
84+
return nil
85+
}
86+
7487
// Contents -
7588
type Content struct {
7689
Kind string `json:"kind"`

node/monitor.go

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"io"
77
"net/http"
8-
"reflect"
98
"strings"
109
"sync"
1110
"time"
@@ -26,9 +25,9 @@ type Monitor struct {
2625
url string
2726

2827
applied chan []*Applied
29-
refused chan []*Applied
30-
branchDelayed chan []*Applied
31-
branchRefused chan []*Applied
28+
refused chan []*FailedMonitor
29+
branchDelayed chan []*FailedMonitor
30+
branchRefused chan []*FailedMonitor
3231

3332
subscribedOnApplied bool
3433
subscribedOnRefused bool
@@ -43,9 +42,9 @@ func NewMonitor(url string) *Monitor {
4342
return &Monitor{
4443
url: strings.TrimSuffix(url, "/"),
4544
applied: make(chan []*Applied, 4096),
46-
refused: make(chan []*Applied, 4096),
47-
branchDelayed: make(chan []*Applied, 4096),
48-
branchRefused: make(chan []*Applied, 4096),
45+
refused: make(chan []*FailedMonitor, 4096),
46+
branchDelayed: make(chan []*FailedMonitor, 4096),
47+
branchRefused: make(chan []*FailedMonitor, 4096),
4948
}
5049
}
5150

@@ -109,17 +108,17 @@ func (monitor *Monitor) Applied() <-chan []*Applied {
109108
}
110109

111110
// BranchRefused -
112-
func (monitor *Monitor) BranchRefused() <-chan []*Applied {
111+
func (monitor *Monitor) BranchRefused() <-chan []*FailedMonitor {
113112
return monitor.branchRefused
114113
}
115114

116115
// BranchDelayed -
117-
func (monitor *Monitor) BranchDelayed() <-chan []*Applied {
116+
func (monitor *Monitor) BranchDelayed() <-chan []*FailedMonitor {
118117
return monitor.branchDelayed
119118
}
120119

121120
// Refused -
122-
func (monitor *Monitor) Refused() <-chan []*Applied {
121+
func (monitor *Monitor) Refused() <-chan []*FailedMonitor {
123122
return monitor.refused
124123
}
125124

@@ -137,35 +136,31 @@ func (monitor *Monitor) pollingMempool(ctx context.Context, filter string) {
137136
case <-ctx.Done():
138137
return
139138
default:
140-
ch, err := monitor.selectChannel(filter)
141-
if err != nil {
139+
if err := monitor.process(ctx, filter, url); err != nil {
142140
log.Error(err)
143141
continue
144142
}
145143

146-
if err := monitor.longPolling(ctx, url, ch); err != nil {
147-
log.Error(err)
148-
}
149144
}
150145
}
151146
}
152147

153-
func (monitor *Monitor) selectChannel(filter string) (interface{}, error) {
148+
func (monitor *Monitor) process(ctx context.Context, filter, url string) error {
154149
switch filter {
155150
case filterApplied:
156-
return monitor.applied, nil
151+
return monitor.longPollingApplied(ctx, url, monitor.applied)
157152
case filterBranchDelayed:
158-
return monitor.branchDelayed, nil
153+
return monitor.longPollingFailed(ctx, url, monitor.branchDelayed)
159154
case filterBranchRefused:
160-
return monitor.branchRefused, nil
155+
return monitor.longPollingFailed(ctx, url, monitor.branchRefused)
161156
case filterRefused:
162-
return monitor.refused, nil
157+
return monitor.longPollingFailed(ctx, url, monitor.refused)
163158
default:
164-
return nil, errors.Errorf("unknown filter: %s", filter)
159+
return errors.Errorf("unknown filter: %s", filter)
165160
}
166161
}
167162

168-
func (monitor *Monitor) longPolling(ctx context.Context, url string, ch interface{}) error {
163+
func (monitor *Monitor) longPollingApplied(ctx context.Context, url string, ch chan []*Applied) error {
169164
link := fmt.Sprintf("%s/%s", monitor.url, url)
170165
req, err := http.NewRequest(http.MethodGet, link, nil)
171166
if err != nil {
@@ -179,50 +174,82 @@ func (monitor *Monitor) longPolling(ctx context.Context, url string, ch interfac
179174
if err != nil {
180175
return err
181176
}
182-
return monitor.parseLongPollingResponse(ctx, resp, ch)
177+
return monitor.parseLongPollingAppliedResponse(ctx, resp, ch)
183178
}
184179

185-
func (monitor *Monitor) parseLongPollingResponse(ctx context.Context, resp *http.Response, ch interface{}) error {
180+
func (monitor *Monitor) parseLongPollingAppliedResponse(ctx context.Context, resp *http.Response, ch chan []*Applied) error {
186181
if resp == nil {
187182
return errors.New("nil response on mempool long polling request")
188183
}
189184
if ch == nil {
190185
return errors.New("nil output channel during mempool long polling request")
191186
}
192187

193-
typ := reflect.TypeOf(ch)
194-
if typ.Kind() != reflect.Chan {
195-
return errors.Errorf("invalid channel type: %T", ch)
188+
decoder := json.NewDecoder(resp.Body)
189+
190+
for {
191+
select {
192+
case <-ctx.Done():
193+
return ctx.Err()
194+
default:
195+
for decoder.More() {
196+
value := make([]*Applied, 0)
197+
if err := decoder.Decode(&value); err != nil {
198+
if err == io.EOF || err == io.ErrUnexpectedEOF {
199+
return nil
200+
}
201+
return err
202+
}
203+
ch <- value
204+
}
205+
time.Sleep(time.Millisecond) // sleeping for CPU usage decreasing
206+
}
196207
}
208+
}
197209

198-
decoder := json.NewDecoder(resp.Body)
199-
cases := []reflect.SelectCase{
200-
{
201-
Dir: reflect.SelectSend,
202-
Chan: reflect.ValueOf(ch),
203-
},
204-
{
205-
Dir: reflect.SelectRecv,
206-
Chan: reflect.ValueOf(ctx.Done()),
207-
},
210+
func (monitor *Monitor) longPollingFailed(ctx context.Context, url string, ch chan []*FailedMonitor) error {
211+
link := fmt.Sprintf("%s/%s", monitor.url, url)
212+
req, err := http.NewRequest(http.MethodGet, link, nil)
213+
if err != nil {
214+
return err
208215
}
216+
client := http.Client{
217+
Timeout: time.Minute,
218+
}
219+
220+
resp, err := client.Do(req)
221+
if err != nil {
222+
return err
223+
}
224+
return monitor.parseLongPollingFailedResponse(ctx, resp, ch)
225+
}
226+
227+
func (monitor *Monitor) parseLongPollingFailedResponse(ctx context.Context, resp *http.Response, ch chan []*FailedMonitor) error {
228+
if resp == nil {
229+
return errors.New("nil response on mempool long polling request")
230+
}
231+
if ch == nil {
232+
return errors.New("nil output channel during mempool long polling request")
233+
}
234+
235+
decoder := json.NewDecoder(resp.Body)
209236

210237
for {
211238
select {
212239
case <-ctx.Done():
213240
return ctx.Err()
214241
default:
215-
value := reflect.New(typ.Elem())
216-
if err := decoder.Decode(value.Interface()); err != nil {
217-
if err == io.EOF || err == io.ErrUnexpectedEOF {
218-
return nil
242+
for decoder.More() {
243+
value := make([]*FailedMonitor, 0)
244+
if err := decoder.Decode(&value); err != nil {
245+
if err == io.EOF || err == io.ErrUnexpectedEOF {
246+
return nil
247+
}
248+
return err
219249
}
220-
return err
221-
}
222-
cases[0].Send = value.Elem()
223-
if chosen, _, _ := reflect.Select(cases); chosen == 1 {
224-
return ctx.Err()
250+
ch <- value
225251
}
252+
time.Sleep(time.Millisecond) // sleeping for CPU usage decreasing
226253
}
227254
}
228255
}

0 commit comments

Comments
 (0)