-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream_test.go
157 lines (119 loc) · 3.57 KB
/
stream_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package epee
import (
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"path"
"sync"
"testing"
)
func TestSyncsStreamPositionOnSuccessfulFlush(t *testing.T) {
testMessage := TestMessage{
Name: proto.String("My Test"),
}
bytes, err := proto.Marshal(&testMessage)
if err != nil {
t.Fatalf("Expected no error marshalling test message, got %v", err)
}
zk := newMockZookeeperClient()
ks, consumer := newMockKafkaStream(t, "test-client-1", zk)
// Set up expectations for Kafka.
pc := consumer.ExpectConsumePartition("topic-1", 1, sarama.OffsetOldest)
pc.YieldMessage(&sarama.ConsumerMessage{
Key: []byte("test"),
Value: bytes,
Topic: "topic-1",
Partition: int32(1),
Offset: int64(1),
})
stream, err := newStreamWithKafkaStream("test-client-1", zk, ks)
if err != nil {
t.Errorf("Failed to instantiate stream. Got %v", err)
}
var wg sync.WaitGroup
proc := testProcessor{&testMessage, t, &wg}
wg.Add(1)
stream.Register("topic-1", TestMessage{})
err = stream.Stream("topic-1", 1, &proc)
// Stream should have started successfully.
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
// Wait for everything to finish up
wg.Wait()
// Now let's try to flush. Should be dirty.
stream.flushAll()
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
var offset int64
err = zk.Get(path.Join(DefaultZookeeperPrefix, "test-client-1", "topic-1", "1"), &offset)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if offset != 1 {
t.Fatalf("Expected saved offset to be 1, got %d", offset)
}
}
func TestPicksUpFromLastFlush(t *testing.T) {
// This is the path that we'll expect data to come from.
fullPath := path.Join(DefaultZookeeperPrefix, "test-client-1", "topic-1", "1")
// Our example test message.
testMessage := TestMessage{
Name: proto.String("My Test"),
}
bytes, err := proto.Marshal(&testMessage)
if err != nil {
t.Fatalf("Expected no error marshalling test message, got %v", err)
}
zk := newMockZookeeperClient()
zk.Set(fullPath, int64(123456))
ks, consumer := newMockKafkaStream(t, "test-client-1", zk)
// Set up expectations in kafak. Big thing here is that the offset should be
// 1 after the offset stored in ZK.
pc := consumer.ExpectConsumePartition("topic-1", 1, 123457)
pc.YieldMessage(&sarama.ConsumerMessage{
Key: []byte("test"),
Value: bytes,
Topic: "topic-1",
Partition: int32(1),
Offset: int64(123456),
})
// Okay now we can try all this stuff.
stream, err := newStreamWithKafkaStream("test-client-1", zk, ks)
if err != nil {
t.Errorf("Failed to instantiate stream. Got %v", err)
}
var wg sync.WaitGroup
proc := testProcessor{&testMessage, t, &wg}
wg.Add(1)
stream.Register("topic-1", TestMessage{})
err = stream.Stream("topic-1", 1, &proc)
// Stream should have started successfully.
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
// Wait for everything to finish up
wg.Wait()
// NOTE: This test will fail due to expectations set on the Sarama mock if
// the wrong offset is used or something like that.
}
type testProcessor struct {
original *TestMessage
t *testing.T
wg *sync.WaitGroup
}
func (t *testProcessor) Process(offset int64, message proto.Message) error {
defer t.wg.Done()
m, ok := message.(*TestMessage)
if !ok {
t.t.Fatalf("Expected message type TestMessage, got %v", message)
}
if t.original.GetName() != m.GetName() {
t.t.Fatalf("Expected message name '%s', got '%s'", t.original.GetName(), m.GetName())
}
return nil
}
func (t *testProcessor) Flush() error {
// noop
return nil
}