Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions packages/suite-base/src/dataSources/RosboardDataSourceFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/

import {

Check failure on line 8 in packages/suite-base/src/dataSources/RosboardDataSourceFactory.ts

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

There is an error with the file header. Please check if the header exists or if there is a mistake in it
IDataSourceFactory,
DataSourceFactoryInitializeArgs,
} from "@lichtblick/suite-base/context/PlayerSelectionContext";
import RosboardPlayer from "@lichtblick/suite-base/players/RosboardPlayer";
import { Player } from "@lichtblick/suite-base/players/types";

class RosboardDataSourceFactory implements IDataSourceFactory {
public id = "rosboard-websocket";
public type: IDataSourceFactory["type"] = "connection";
public displayName = "Rosboard";
public iconName: IDataSourceFactory["iconName"] = "Flow";
public docsLinks = [
{
url: "https://github.com/kiwicampus/studio/tree/kiwi-main?tab=readme-ov-file#connecting-to-rosboard",
},
];
public description = "Connect to a ROS 1 or ROS 2 system using the Rosboard WebSocket protocol.";

public formConfig = {
fields: [
{
id: "url",
label: "WebSocket URL",
defaultValue: "ws://localhost:8888/rosboard/v1",
validate: (newValue: string): Error | undefined => {
try {
const url = new URL(newValue);
if (url.protocol !== "ws:" && url.protocol !== "wss:") {
return new Error(`Invalid protocol: ${url.protocol}`);
}
return undefined;
} catch (err) {

Check failure on line 40 in packages/suite-base/src/dataSources/RosboardDataSourceFactory.ts

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

'err' is defined but never used
return new Error("Enter a valid url");
}
},
},
],
};

public initialize(args: DataSourceFactoryInitializeArgs): Player | undefined {
const url = args.params?.url;
if (!url) {
return;
}

return new RosboardPlayer({ url, metricsCollector: args.metricsCollector, sourceId: this.id });
}
}

export default RosboardDataSourceFactory;
1 change: 1 addition & 0 deletions packages/suite-base/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export { default as Ros1LocalBagDataSourceFactory } from "./dataSources/Ros1Loca
export { default as Ros1SocketDataSourceFactory } from "./dataSources/Ros1SocketDataSourceFactory";
export { default as Ros2LocalBagDataSourceFactory } from "./dataSources/Ros2LocalBagDataSourceFactory";
export { default as RosbridgeDataSourceFactory } from "./dataSources/RosbridgeDataSourceFactory";
export { default as RosboardDataSourceFactory } from "./dataSources/RosboardDataSourceFactory";
export { default as UlogLocalDataSourceFactory } from "./dataSources/UlogLocalDataSourceFactory";
export { default as RemoteDataSourceFactory } from "./dataSources/RemoteDataSourceFactory";
export { default as VelodyneDataSourceFactory } from "./dataSources/VelodyneDataSourceFactory";
Expand Down
319 changes: 319 additions & 0 deletions packages/suite-base/src/players/RosboardClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/

interface Topic {

Check failure on line 8 in packages/suite-base/src/players/RosboardClient.ts

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

There is an error with the file header. Please check if the header exists or if there is a mistake in it
[name: string]: string;
}

interface TypeIndex {
[type: string]: string | undefined;
}

interface SubscribePayload {
topicName: string;
maxUpdateRate: number;
}

interface UnsubscribePayload {
topicName: string;
}

interface MessagePayload {
[key: string]: any;

Check failure on line 26 in packages/suite-base/src/players/RosboardClient.ts

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Unexpected any. Specify a different type
}

type MessageCallback = (message: MessagePayload) => void;
type EventCallback = () => void;

// Replaces any key named "nsec" with "nanosec" in the object
function renameNsecToNanosec(obj: any): any {

Check failure on line 33 in packages/suite-base/src/players/RosboardClient.ts

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Unexpected any. Specify a different type

Check failure on line 33 in packages/suite-base/src/players/RosboardClient.ts

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Unexpected any. Specify a different type
if (Array.isArray(obj)) {
return obj.map((item) => renameNsecToNanosec(item));
} else if (obj !== null && typeof obj === "object") {

Check failure on line 36 in packages/suite-base/src/players/RosboardClient.ts

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Prefer 'x != null' to catch both null and undefined
const newObj: any = {};
for (const key in obj) {
if (obj.hasOwnProperty(key)) {
const newKey = key === "nsec" ? "nanosec" : key;
newObj[newKey] = renameNsecToNanosec(obj[key]);
}
}
return newObj;
}
return obj;
}

export class PubTopic {
rosClient: RosboardClient;
name: string;
messageType: string;
queueSize: number;

constructor(rosClient: RosboardClient, name: string, messageType: string, queueSize: number) {
this.rosClient = rosClient;
this.name = name;
this.messageType = messageType;
this.queueSize = queueSize;
}

unadvertise(): void {
// Send message to destroy publisher in server. Message is expected to be: ["n", {topicName: xxxx}]
const message = JSON.stringify(["n", { topicName: this.name }]);
if (message !== undefined) {
this.rosClient.send(message);
} else {
console.error("Message is undefined. Cannot send.");
}
}

// Just for compatibility with the roslib version
// In rosboard we don't need to advertise the topic, it is done automatically when
// we send a message for the first time in the rosboard server
advertise(): void {}

publish(msg: MessagePayload): void {
// Rosboard expects headers to have nsec named nanosec, but foxglove uses nsec
msg = renameNsecToNanosec(msg);
// rosboard expects a message like this: ["m", {message dictionary}]
// and message dictionary is in the form of {_topic_name: topic, _topic_type: type, ...payload}
const payload = {
_topic_name: this.name,
_topic_type: this.messageType,
...msg,
};

const jsonString = JSON.stringify(["m", payload]);

if (jsonString !== undefined) {
this.rosClient.send(jsonString);
} else {
console.error("Message is undefined. Cannot send.");
}
}
}

export default class RosboardClient {
ws?: WebSocket;
hostname: string = "";
version: string = "";
closed: boolean = false;
url: string;
private _availableTopics: Topic = {};
private _topicsFull: TypeIndex = {};
private _topicsFullRequested: boolean = false;
sequenceNumber: number | null = null;
connectionCallbacks: EventCallback[] = [];
errorCallback?: (error: Error) => void;
closeCallback?: () => void;
subscribedTopics: string[] = [];
topicCallbacks: { [topicName: string]: MessageCallback } = {};

public constructor({ url }: { url: string }) {
this.url = url;
this.openConnection();
}

openConnection = (): void => {
if (this.ws != undefined) {
throw new Error(`Attempted to open a second WebSocket Connection`);
}

const ws = new WebSocket(this.url);

ws.addEventListener("open", () => {
this.ws = ws;
this.connectionCallbacks.forEach((callback) => {
callback();
});
});

ws.addEventListener("error", (event) => {
console.error("WebSocket error:", event);
const error = event instanceof ErrorEvent ? event.error : new Error("WebSocket error");
if (this.errorCallback) {
this.errorCallback(error);
}
});

ws.addEventListener("close", () => {
this.ws = undefined;
this.closed = true;
if (this.closeCallback) {
this.closeCallback();
}
});

ws.addEventListener("message", async (event) => {
try {
let data: [string, any];
if (typeof event.data === "string") {
data = JSON.parse(event.data);
} else if (event.data instanceof Blob) {
const result = await new Promise<string>((resolve, reject) => {
const reader = new FileReader();

reader.onload = () => {
if (reader.result) {
resolve(reader.result as string);
} else {
reject(new Error("Reader result is empty"));
}
};

reader.onerror = () => {
reject(reader.error);
};

reader.readAsText(event.data as Blob);
});

data = JSON.parse(result) as [string, any];
} else {
data = ["z", "invalid"];
//console.log("Invalid input");
//console.log(typeof(data));
}
const [type, payload] = data;

if (type === "y" && typeof payload === "object") {
this.hostname = payload.hostname;
this.version = payload.version;
} else if (type === "t" && typeof payload === "object") {
// Update availableTopics directly with the new payload
this._availableTopics = payload;
//console.log('Updated Available Topics:', this._availableTopics);
} else if (type === "f" && typeof payload === "object") {
// Update availableTopics directly with the new payload
const typedefs: TypeIndex = {};
Object.keys(payload).forEach((k) => {
typedefs[payload[k].type] = payload[k].typedef;
});
this._topicsFull = typedefs;
this._topicsFullRequested = false;
} else if (
type === "m" &&
typeof payload === "object" &&
payload._topic_name &&
this.subscribedTopics.includes(payload._topic_name)
) {
// Message received for a subscribed topic
const topicName = payload._topic_name;
if (this.topicCallbacks[topicName]) {
// Execute the callback function for the topic
const callback = this.topicCallbacks[topicName];
if (typeof callback == "function") {
callback(payload);
}
}
} else if (type === "p" && typeof payload === "object" && typeof payload.s === "number") {
// Respond with a message of type 'q' containing the current timestamp and matching sequence number
const sequenceNumber = payload.s;
const timestamp = Date.now();
const response = JSON.stringify(["q", { s: sequenceNumber, t: timestamp }]);
if (this.ws && response) {
this.ws.send(response);
//console.log('Sent response:', response);
}
}
} catch (error) {
console.error("Error parsing message:", error);
}
});
};

on(
event: "connection" | "error" | "close",
callback: EventCallback | ((error: Error) => void) | (() => void),
) {
if (event === "connection") {
this.connectionCallbacks.push(callback as EventCallback);
} else if (event === "error") {
this.errorCallback = callback as (error: Error) => void;
} else if (event === "close") {
this.closeCallback = callback as () => void;
}
}

get availableTopics(): Topic {
return this._availableTopics;
}

get topicsFull(): TypeIndex {
return this._topicsFull;
}

requestTopicsFull(): void {
if (this._topicsFullRequested) {
return;
}
const message = JSON.stringify(["f"]);
if (message !== undefined) {
this.send(message);
this._topicsFullRequested = true;
} else {
console.error("Message is undefined. Cannot send.");
}
}

subscribe(topicName: string, maxUpdateRate: number): void {
const payload: SubscribePayload = {
topicName,
maxUpdateRate,
};
const message = JSON.stringify(["s", payload]);
//console.log(message);
//console.log ("Subscribing to ", topicName);
if (message !== undefined) {
this.send(message);
} else {
console.error("Message is undefined. Cannot send.");
}
this.subscribedTopics.push(topicName);
}

unsubscribe(topicName: string): void {
const payload: UnsubscribePayload = {
topicName,
};
const message = JSON.stringify(["u", payload]);
//console.log ("Un-Subscribing to ", topicName);
if (message !== undefined) {
this.send(message);
} else {
console.error("Message is undefined. Cannot send.");
}
const index = this.subscribedTopics.indexOf(topicName);
if (index !== -1) {
this.subscribedTopics.splice(index, 1);
}
}

addTopicCallback(topicName: string, callback: MessageCallback): void {
this.topicCallbacks[topicName] = callback;
// Subscribe to the topic when adding the callback
this.subscribe(topicName, 24);
}

send(message: string): void {
if (this.ws) {
this.ws.send(message);
//console.log('Sent message:', message);
} else {
console.error("WebSocket connection is not established");
}
}

close(): void {
if (this.ws) {
this.ws.close();
this.ws = undefined;
this.closed = true;
//console.log('WebSocket connection closed');
} else {
console.warn("WebSocket connection is already closed");
}
}
}
Loading
Loading