Skip to content

Commit 38a0945

Browse files
committed
started adding a rosbag reader
1 parent 91bd718 commit 38a0945

File tree

4 files changed

+130
-25
lines changed

4 files changed

+130
-25
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@ _testmain.go
2222
*.exe
2323
*.test
2424
*.prof
25+
26+
*.bag

ros.go

+57-23
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"fmt"
66
"io"
77
"log"
8-
"regexp"
9-
"strings"
108
"sync"
119

1210
"golang.org/x/net/websocket"
@@ -16,6 +14,11 @@ var (
1614
messageCount = 0
1715
)
1816

17+
type Base struct {
18+
Op string `json:"op"`
19+
Id string `json:"id"`
20+
}
21+
1922
type Ros struct {
2023
origin string
2124
url string
@@ -55,6 +58,21 @@ func (ros *Ros) getServiceResponse(service *ServiceCall) *ServiceResponse {
5558
return serviceResponse.(*ServiceResponse)
5659
}
5760

61+
func (ros *Ros) getTopicResponse(topic *Topic) *interface{} {
62+
response := make(chan interface{})
63+
ros.receivedMapMutex.Lock()
64+
ros.receivedMap[topic.Id] = response
65+
ros.receivedMapMutex.Unlock()
66+
err := websocket.JSON.Send(ros.ws, topic)
67+
if err != nil {
68+
fmt.Println("Couldn't send msg")
69+
}
70+
log.Println(ros.receivedMap)
71+
72+
topicResponse := <-response
73+
return &topicResponse
74+
}
75+
5876
func (ros *Ros) returnToAppropriateChannel(id string, data interface{}) {
5977
ros.receivedMapMutex.Lock()
6078
ros.receivedMap[id] <- data
@@ -73,34 +91,34 @@ func (ros *Ros) handleIncoming() {
7391
break
7492
}
7593

76-
opRegex, err := regexp.Compile(`"op"\s*:\s*"[[:alpha:],_]*`)
77-
if err != nil {
78-
log.Println(err)
79-
}
80-
opString := opRegex.FindString(string(msg))
81-
splitOpString := strings.Split(opString, "\"")
82-
operation := splitOpString[len(splitOpString)-1]
83-
84-
//log.Println(operation)
85-
8694
/*
87-
var data map[string]interface{}
88-
jsonErr := json.Unmarshal(msg, &data)
89-
//fmt.Printf("Received from server: %s\n", data)
90-
if jsonErr != nil {
91-
panic(jsonErr)
95+
opRegex, err := regexp.Compile(`"op"\s*:\s*"[[:alpha:],_]*`)
96+
if err != nil {
97+
log.Println(err)
9298
}
93-
94-
ros.receivedMapMutex.Lock()
95-
ros.receivedMap[data["id"].(string)] <- data
96-
ros.receivedMapMutex.Unlock()
99+
opString := opRegex.FindString(string(msg))
100+
splitOpString := strings.Split(opString, "\"")
101+
operation := splitOpString[len(splitOpString)-1]
97102
*/
98-
if operation == "service_response" {
103+
104+
var base Base
105+
json.Unmarshal(msg, &base)
106+
107+
log.Println(base)
108+
109+
if base.Op == "service_response" {
99110
var serviceResponse ServiceResponse
100111
json.Unmarshal(msg, &serviceResponse)
101112
ros.receivedMapMutex.Lock()
102113
ros.receivedMap[serviceResponse.Id] <- &serviceResponse
103114
ros.receivedMapMutex.Unlock()
115+
} else if base.Op == "publish" {
116+
log.Println(base)
117+
var topic Topic
118+
json.Unmarshal(msg, &topic)
119+
ros.receivedMapMutex.Lock()
120+
ros.receivedMap[topic.Topic] <- &topic
121+
ros.receivedMapMutex.Unlock()
104122
}
105123
}
106124
}
@@ -112,6 +130,22 @@ func (ros *Ros) GetTopics() []string {
112130
return topics
113131
}
114132

115-
func (ros *Ros) Subscribe(topic *Topic) {
133+
func (ros *Ros) Subscribe(topicName string, callback TopicCallback) {
134+
//topicResponse := ros.getTopicResponse(topic)
135+
topic := NewTopic(topicName)
116136

137+
response := make(chan interface{})
138+
ros.receivedMapMutex.Lock()
139+
ros.receivedMap[topic.Topic] = response
140+
ros.receivedMapMutex.Unlock()
141+
err := websocket.JSON.Send(ros.ws, topic)
142+
if err != nil {
143+
fmt.Println("Couldn't send msg")
144+
}
145+
146+
go func() {
147+
for {
148+
callback(&(<-response).(*Topic).Msg)
149+
}
150+
}()
117151
}

rosbag.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"encoding/binary"
6+
"flag"
7+
"log"
8+
"os"
9+
)
10+
11+
func parseRecordHeader(reader *bufio.Reader) {
12+
var headerLength int32
13+
binary.Read(reader, binary.LittleEndian, &headerLength)
14+
//oneByte, err := reader.ReadByte()
15+
log.Println(headerLength)
16+
17+
var fieldLength int32
18+
19+
binary.Read(reader, binary.LittleEndian, &fieldLength)
20+
fieldName, _ := reader.ReadString('=')
21+
log.Println(fieldName)
22+
23+
}
24+
25+
func parseRosbag(path string) {
26+
log.Println(path)
27+
file, err := os.Open(path) // For read access.
28+
if err != nil {
29+
log.Fatal(err)
30+
}
31+
defer file.Close()
32+
33+
reader := bufio.NewReader(file)
34+
35+
line, _ := reader.ReadString('\n')
36+
log.Print(line)
37+
38+
file.Seek(4096, 0) //offsets by the size of the chunk header
39+
40+
reader = bufio.NewReader(file)
41+
42+
parseRecordHeader(reader)
43+
/*chunkHeader := make([]byte, 4096)
44+
_, _ = file.Read(chunkHeader)
45+
log.Println(string(bytes))
46+
*/
47+
}
48+
49+
func main() {
50+
flag.Parse()
51+
parseRosbag(flag.Arg(0))
52+
}

topic.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,23 @@
11
package goros
22

3+
import (
4+
"encoding/json"
5+
"fmt"
6+
)
7+
38
type Topic struct {
4-
Op string
5-
Id string
9+
Op string `json:"op"`
10+
Id string `json:"id"`
11+
Topic string `json:"topic"`
12+
Msg json.RawMessage `json:"msg,omitempty"`
13+
}
14+
15+
type TopicCallback func(*json.RawMessage)
16+
17+
func NewTopic(topicName string) *Topic {
18+
topic := &Topic{Op: "subscribe", Topic: topicName}
19+
topic.Id = fmt.Sprintf("%s:%s:%d", topic.Op, topic.Topic, messageCount)
20+
messageCount++
21+
22+
return topic
623
}

0 commit comments

Comments
 (0)