diff --git a/src/channel.ts b/src/channel.ts index 538c6c42aa..653a41005c 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -61,6 +61,7 @@ import { PartialUpdateMemberAPIResponse, AIState, MessageOptions, + Attachment, } from './types'; import { Role } from './permissions'; import { DEFAULT_QUERY_CHANNEL_MESSAGE_LIST_PAGE_SIZE } from './constants'; @@ -471,6 +472,111 @@ export class Channel, + ) { + const { latitude, longitude } = attachmentMetadata; + + const message: Message = { + attachments: [ + { + ...attachmentMetadata, + type: 'static_location', + latitude, + longitude, + }, + ], + }; + + return await this.sendMessage(message); + } + + public async startLiveLocationSharing( + attachmentMetadata: { end_time: string; latitude: number; longitude: number } & Attachment, + ) { + const client = this.getClient(); + if (!client.userID) return; + + const { latitude, longitude, end_time } = attachmentMetadata; + + const message: Message = { + attachments: [ + { + ...attachmentMetadata, + type: 'live_location', + latitude, + longitude, + end_time, + }, + ], + }; + + // FIXME: this is wrong and could easily be walked around by integrators + const existing = await this.getClient().search( + // @ts-ignore + { + cid: this.cid, + }, + { + $and: [ + { 'attachments.type': { $eq: 'live_location' } }, + // has not been manually stopped + { + 'attachments.stopped_sharing': { + $nin: [true], + }, + }, + // has not ended + { + 'attachments.end_time': { + $gt: new Date().toISOString(), + }, + }, + ], + }, + ); + + const promises: Promise[] = []; + + for (const result of existing.results) { + const [attachment] = result.message.attachments ?? []; + + promises.push( + client.partialUpdateMessage(result.message.id, { + // @ts-expect-error + set: { + attachments: [ + { + ...attachment, + stopped_sharing: true, + }, + ], + }, + }), + ); + } + + // FIXME: sending message if the previous part failed/did not happen + // should result in BE error + promises.unshift(this.sendMessage(message)); + + const [response] = await Promise.allSettled(promises); + + if (response.status === 'fulfilled') { + this.getClient().dispatchEvent({ message: response.value.message, type: 'live_location_sharing.started' }); + } + } + + public async stopLiveLocationSharing(message: MessageResponse) { + const [attachment] = message.attachments ?? []; + const response = await this.getClient().partialUpdateMessage(message.id, { + // @ts-expect-error this is a valid update + set: { attachments: [{ ...attachment, stopped_sharing: true }] }, + }); + + this.getClient().dispatchEvent({ message: response.message, type: 'live_location_sharing.stopped' }); + } + /** * delete - Delete the channel. Messages are permanently removed. * diff --git a/src/client.ts b/src/client.ts index 4f8ee4dec2..562f39f8ed 100644 --- a/src/client.ts +++ b/src/client.ts @@ -2552,6 +2552,24 @@ export class StreamChat, + { latitude, longitude }: { latitude: number; longitude: number }, + ) { + const [attachment] = message.attachments ?? []; + + if (!attachment || attachment.type !== 'live_location') { + throw new Error( + 'Supplied message either has no attachments to update or attachment is not of type "live_location"', + ); + } + + return this.partialUpdateMessage(message.id, { + // @ts-expect-error valid update + set: { attachments: [{ ...attachment, latitude, longitude }] }, + }); + } + /** * pinMessage - pins the message * @param {string | { id: string }} messageOrMessageId message object or message id diff --git a/src/concurrency.ts b/src/concurrency.ts new file mode 100644 index 0000000000..9c3c82689f --- /dev/null +++ b/src/concurrency.ts @@ -0,0 +1,117 @@ +interface PendingPromise { + onContinued: () => void; + promise: Promise; +} + +type AsyncWrapper

= ( + tag: string | symbol, + cb: (...args: P) => Promise, +) => { + cb: () => Promise; + onContinued: () => void; +}; + +/** + * Runs async functions serially. Useful for wrapping async actions that + * should never run simultaneously: if marked with the same tag, functions + * will run one after another. + * + * @param tag Async functions with the same tag will run serially. Async functions + * with different tags can run in parallel. + * @param cb Async function to run. + * @returns Promise that resolves when async functions returns. + */ +export const withoutConcurrency = createRunner(wrapWithContinuationTracking); + +/** + * Runs async functions serially, and cancels all other actions with the same tag + * when a new action is scheduled. Useful for wrapping async actions that override + * each other (e.g. enabling and disabling camera). + * + * If an async function hasn't started yet and was canceled, it will never run. + * If an async function is already running and was canceled, it will be notified + * via an abort signal passed as an argument. + * + * @param tag Async functions with the same tag will run serially and are canceled + * when a new action with the same tag is scheduled. + * @param cb Async function to run. Receives AbortSignal as the only argument. + * @returns Promise that resolves when async functions returns. If the function didn't + * start and was canceled, will resolve with 'canceled'. If the function started to run, + * it's up to the function to decide how to react to cancelation. + */ +export const withCancellation = createRunner(wrapWithCancellation); + +const pendingPromises = new Map(); + +export function hasPending(tag: string | symbol) { + return pendingPromises.has(tag); +} + +export async function settled(tag: string | symbol) { + await pendingPromises.get(tag)?.promise; +} + +/** + * Implements common functionality of running async functions serially, by chaining + * their promises one after another. + * + * Before running, async function is "wrapped" using the provided wrapper. This wrapper + * can add additional steps to run before or after the function. + * + * When async function is scheduled to run, the previous function is notified + * by calling the associated onContinued callback. This behavior of this callback + * is defined by the wrapper. + */ +function createRunner

(wrapper: AsyncWrapper) { + return function run(tag: string | symbol, cb: (...args: P) => Promise) { + const { cb: wrapped, onContinued } = wrapper(tag, cb); + const pending = pendingPromises.get(tag); + pending?.onContinued(); + const promise = pending ? pending.promise.then(wrapped, wrapped) : wrapped(); + pendingPromises.set(tag, { promise, onContinued }); + return promise; + }; +} + +/** + * Wraps an async function with an additional step run after the function: + * if the function is the last in the queue, it cleans up the whole chain + * of promises after finishing. + */ +function wrapWithContinuationTracking(tag: string | symbol, cb: () => Promise) { + let hasContinuation = false; + const wrapped = () => + cb().finally(() => { + if (!hasContinuation) { + pendingPromises.delete(tag); + } + }); + const onContinued = () => (hasContinuation = true); + return { cb: wrapped, onContinued }; +} + +/** + * Wraps an async function with additional functionalilty: + * 1. Associates an abort signal with every function, that is passed to it + * as an argument. When a new function is scheduled to run after the current + * one, current signal is aborted. + * 2. If current function didn't start and was aborted, in will never start. + * 3. If the function is the last in the queue, it cleans up the whole chain + * of promises after finishing. + */ +function wrapWithCancellation(tag: string | symbol, cb: (signal: AbortSignal) => Promise) { + const ac = new AbortController(); + const wrapped = () => { + if (ac.signal.aborted) { + return Promise.resolve('canceled' as const); + } + + return cb(ac.signal).finally(() => { + if (!ac.signal.aborted) { + pendingPromises.delete(tag); + } + }); + }; + const onContinued = () => ac.abort(); + return { cb: wrapped, onContinued }; +} diff --git a/src/events.ts b/src/events.ts index e145074a3d..f4e74194a7 100644 --- a/src/events.ts +++ b/src/events.ts @@ -59,4 +59,6 @@ export const EVENT_MAP = { 'connection.recovered': true, 'transport.changed': true, 'capabilities.changed': true, + 'live_location_sharing.started': true, + 'live_location_sharing.stopped': true, }; diff --git a/src/index.ts b/src/index.ts index c0d0901f6d..4bbf28d8c2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,4 +18,5 @@ export * from './thread'; export * from './thread_manager'; export * from './token_manager'; export * from './types'; +export * from './live_location_manager'; export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils'; diff --git a/src/live_location_manager.ts b/src/live_location_manager.ts new file mode 100644 index 0000000000..7aaa947f2c --- /dev/null +++ b/src/live_location_manager.ts @@ -0,0 +1,410 @@ +/** + * RULES: + * + * 1. one loc-sharing message per channel per user + * 2. mandatory geolocation_eol (maxnow + 24h max), which should be unchangeable by anyone (set once) + * 3. serialized object must be stored + * 4. live location is per-device, no other device which did not store the message locally, should be updating the live location attachment + */ + +import { withCancellation } from './concurrency'; +import { StateStore } from './store'; +import type { MessageResponse, Attachment, EventTypes, ExtendableGenerics, UpdateMessageAPIResponse } from './types'; +import type { StreamChat } from './client'; +import type { Unsubscribe } from './store'; + +// type Unsubscribe = () => void; +type WatchLocation = (handler: (value: { latitude: number; longitude: number }) => void) => Unsubscribe; +type SerializeAndStore = (state: MessageResponse[], userId: string) => void; +type RetrieveAndDeserialize = (userId: string) => MessageResponse[]; + +export type LiveLocationManagerState = { + ready: boolean; + targetMessages: MessageResponse[]; +}; + +// if (message.cid && this.messagesByChannelConfId[message.cid]) { +// const [m] = this.messagesByChannelConfId[message.cid]; +// throw new Error( +// `[LocationUpdater.registerMessage]: one live location sharing message per channel limit has been reached, unregister message "${m.id}" first`, +// ); +// } + +// if (!attachment || attachment.type !== 'geolocation' || !attachment.geolocation_eol) { +// throw new Error( +// '[LocationUpdater.registerMessage]: Message has either no attachment, the attachment is not of type "geolocation" or the attachment is missing `geolocation_eol` property', +// ); +// } + +// if (typeof attachment.geolocation_eol !== 'string') { +// throw new Error( +// '[LocationUpdater.registerMessage]: `geolocation_eol` property is of incorrect type, should be date and time ISO 8601 string', +// ); +// } + +// const nowTimestamp = Date.now(); +// const eolTimestamp = new Date(attachment.geolocation_eol).getTime(); + +// if (Number.isNaN(eolTimestamp) || eolTimestamp < nowTimestamp) { +// throw new Error( +// '[LocationUpdater.registerMessage]: `geolocation_eol` has either improper format or has not been set to some time in the future (is lesser than now)', +// ); +// } + +// private async getCompleteMessage(messageId: string) { +// const [cachedMessage, cachedMessageIndex] = this.messagesById[messageId] ?? []; + +// const [cachedMessageAttachment] = cachedMessage?.attachments ?? []; + +// if (isAttachmentValidLLSEntity(cachedMessageAttachment)) { +// return cachedMessage; +// } + +// const queriedMessage = (await this.client.getMessage(messageId)).message; + +// const [queriedMessageAttachment] = queriedMessage.attachments ?? []; + +// if (isAttachmentValidLLSEntity(queriedMessageAttachment)) { +// this.state.next((currentValue) => { +// const newTargetMessages = [...currentValue.targetMessages]; + +// if (typeof cachedMessageIndex === 'number') { +// newTargetMessages[cachedMessageIndex] = queriedMessage; +// } else { +// newTargetMessages.push(queriedMessage); +// } + +// return { +// ...currentValue, +// targetMessages: newTargetMessages, +// }; +// }); + +// return queriedMessage; +// } + +// return null; +// } + +function isValidLiveLocationAttachment(attachment?: Attachment) { + if (!attachment || attachment.type !== 'live_location' || attachment.stopped_sharing) { + return false; + } + + // If end_time has been defined, consider it + if (typeof attachment.end_time === 'string') { + const endTimeTimestamp = new Date(attachment.end_time).getTime(); + + if (Number.isNaN(endTimeTimestamp)) return false; + + const nowTimestamp = Date.now(); + + return nowTimestamp < endTimeTimestamp; + } + + return true; +} + +function isValidLiveLocationMessage(message?: MessageResponse) { + if (!message || message.type === 'deleted') return false; + + const [attachment] = message.attachments ?? []; + + return isValidLiveLocationAttachment(attachment); +} + +export type LiveLocationManagerConstructorParameters = { + client: StreamChat; + watchLocation: WatchLocation; + retrieveAndDeserialize?: RetrieveAndDeserialize; + serializeAndStore?: SerializeAndStore; +}; + +// Hard-coded minimal throttle timeout +const MIN_THROTTLE_TIMEOUT = 3000; + +export class LiveLocationManager { + public state: StateStore>; + private client: StreamChat; + private unsubscribeFunctions: Set<() => void> = new Set(); + private serializeAndStore: SerializeAndStore; + private watchLocation: WatchLocation; + private messagesByChannelConfIdGetterCache: { + calculated: { [key: string]: [MessageResponse, number] }; + targetMessages: LiveLocationManagerState['targetMessages']; + }; + private messagesByIdGetterCache: { + calculated: { [key: string]: [MessageResponse, number] }; + targetMessages: LiveLocationManagerState['targetMessages']; + }; + + static symbol = Symbol(LiveLocationManager.name); + + constructor({ + client, + watchLocation, + retrieveAndDeserialize = (userId) => { + const targetMessagesString = localStorage.getItem(`${userId}-${LiveLocationManager.name}`); + if (!targetMessagesString) return []; + return JSON.parse(targetMessagesString); + }, + serializeAndStore = (messages, userId) => { + localStorage.setItem( + `${userId}-${LiveLocationManager.name}`, + // Strip sensitive data (these will be recovered at on first location watch call) + JSON.stringify(messages.map((message) => ({ id: message.id }))), + ); + }, + }: LiveLocationManagerConstructorParameters) { + this.client = client; + + const retreivedTargetMessages = retrieveAndDeserialize(client.userID!); + + this.state = new StateStore>({ + targetMessages: retreivedTargetMessages, + // If there are no messages to validate, the manager is considered "ready" + ready: retreivedTargetMessages.length === 0, + }); + this.watchLocation = watchLocation; + this.serializeAndStore = serializeAndStore; + + this.messagesByIdGetterCache = { + targetMessages: retreivedTargetMessages, + calculated: {}, + }; + + this.messagesByChannelConfIdGetterCache = { + targetMessages: retreivedTargetMessages, + calculated: {}, + }; + } + + public get messagesById() { + const { targetMessages } = this.state.getLatestValue(); + + if (this.messagesByIdGetterCache.targetMessages !== targetMessages) { + this.messagesByIdGetterCache.targetMessages = targetMessages; + + this.messagesByIdGetterCache.calculated = targetMessages.reduce<{ [key: string]: [MessageResponse, number] }>( + (messagesById, message, index) => { + messagesById[message.id] = [message, index]; + return messagesById; + }, + {}, + ); + } + + return this.messagesByIdGetterCache.calculated; + } + + public get messagesByChannelConfId() { + const { targetMessages } = this.state.getLatestValue(); + + if (this.messagesByChannelConfIdGetterCache.targetMessages !== targetMessages) { + this.messagesByChannelConfIdGetterCache.targetMessages = targetMessages; + + this.messagesByChannelConfIdGetterCache.calculated = targetMessages.reduce<{ + [key: string]: [MessageResponse, number]; + }>((messagesByChannelConfIds, message, index) => { + if (!message.cid) return messagesByChannelConfIds; + + messagesByChannelConfIds[message.cid] = [message, index]; + return messagesByChannelConfIds; + }, {}); + } + + return this.messagesByChannelConfIdGetterCache.calculated; + } + + private subscribeTargetMessagesChange() { + let unsubscribeWatchLocation: null | (() => void) = null; + + // Subscribe to location updates only if there are relevant messages to + // update, no need for the location watcher to be active/instantiated otherwise + const unsubscribe = this.state.subscribeWithSelector( + ({ targetMessages }) => ({ targetMessages }), + ({ targetMessages }) => { + if (!targetMessages.length) { + unsubscribeWatchLocation?.(); + unsubscribeWatchLocation = null; + } else if (targetMessages.length && !unsubscribeWatchLocation) { + unsubscribeWatchLocation = this.subscribeWatchLocation(); + } + + if (this.client.userID) { + this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); + } + }, + ); + + return () => { + unsubscribe(); + unsubscribeWatchLocation?.(); + }; + } + + private subscribeWatchLocation() { + let nextWatcherCallTimestamp = Date.now(); + + // eslint-disable-next-line sonarjs/prefer-immediate-return + const unsubscribe = this.watchLocation(({ latitude, longitude }) => { + // Integrators can adjust the update interval by supplying custom watchLocation subscription, + // but the minimal timeout still has to be set as a failsafe (to prevent rate-limitting) + if (Date.now() < nextWatcherCallTimestamp) return; + + nextWatcherCallTimestamp = Date.now() + MIN_THROTTLE_TIMEOUT; + + withCancellation(LiveLocationManager.symbol, async () => { + const promises: Promise>[] = []; + const { ready } = this.state.getLatestValue(); + + if (!ready) { + await this.recoverAndValidateMessages(); + } + + const { targetMessages } = this.state.getLatestValue(); + // If validator removes messages, we need to check + if (!targetMessages.length) return; + + for (const message of targetMessages) { + if (!isValidLiveLocationMessage(message)) { + this.unregisterMessage(message); + continue; + } + + const promise = this.client.updateLiveLocation(message, { latitude, longitude }); + + promises.push(promise); + } + + await Promise.allSettled(promises); + // TODO: handle values (remove failed - based on specific error code), keep re-trying others + }); + }); + + return unsubscribe; + } + + /** + * Messages stored locally might've been updated while the device which registered message for updates has been offline. + */ + private async recoverAndValidateMessages() { + const { targetMessages } = this.state.getLatestValue(); + + if (!this.client.userID || !targetMessages.length) return; + + const response = await this.client.search( + // @ts-expect-error valid filter + { members: { $in: [this.client.userID] } }, + { id: { $in: targetMessages.map(({ id }) => id) } }, + ); + + const newTargetMessages = []; + + for (const result of response.results) { + const { message } = result; + + if (isValidLiveLocationMessage(message)) { + newTargetMessages.push(message); + } + } + + this.state.partialNext({ ready: true, targetMessages: newTargetMessages }); + } + + private registerMessage(message: MessageResponse) { + if (!this.client.userID || message?.user?.id !== this.client.userID) return; + + if (!isValidLiveLocationMessage(message)) { + return; + } + + this.state.next((currentValue) => ({ ...currentValue, targetMessages: [...currentValue.targetMessages, message] })); + } + + private updateRegisteredMessage(message: MessageResponse) { + if (!this.client.userID || message?.user?.id !== this.client.userID) return; + + const [, targetMessageIndex] = this.messagesById[message.id]; + + this.state.next((currentValue) => { + const newTargetMessages = [...currentValue.targetMessages]; + + newTargetMessages[targetMessageIndex] = message; + + return { + ...currentValue, + targetMessages: newTargetMessages, + }; + }); + } + + private unregisterMessage(message: MessageResponse) { + const [, messageIndex] = this.messagesById[message.id] ?? []; + + if (typeof messageIndex !== 'number') return; + + this.state.next((currentValue) => { + const newTargetMessages = [...currentValue.targetMessages]; + + newTargetMessages.splice(messageIndex, 1); + + return { + ...currentValue, + targetMessages: newTargetMessages, + }; + }); + } + + public unregisterSubscriptions = () => { + this.unsubscribeFunctions.forEach((cleanupFunction) => cleanupFunction()); + this.unsubscribeFunctions.clear(); + }; + + private subscribeLiveLocationSharingUpdates() { + const subscriptions = ([ + 'live_location_sharing.started', + /** + * Both message.updated & live_location_sharing.stopped get emitted when message attachment gets an + * update, live_location_sharing.stopped gets emitted only locally and only if the update goes + * through, it's a failsafe for when channel is no longer being watched for whatever reason + */ + 'message.updated', + 'live_location_sharing.stopped', + 'message.deleted', + ] as EventTypes[]).map((eventType) => + this.client.on(eventType, (event) => { + if (!event.message) return; + + if (event.type === 'live_location_sharing.started') { + this.registerMessage(event.message); + } else if (event.type === 'message.updated') { + const localMessage = this.messagesById[event.message.id]; + + if (!localMessage) return; + + if (!isValidLiveLocationMessage(event.message)) { + this.unregisterMessage(event.message); + } else { + this.updateRegisteredMessage(event.message); + } + } else { + this.unregisterMessage(event.message); + } + }), + ); + + return () => subscriptions.forEach((subscription) => subscription.unsubscribe()); + } + + public registerSubscriptions = () => { + if (this.unsubscribeFunctions.size) { + // LocationUpdater is already listening for events and changes + return; + } + + this.unsubscribeFunctions.add(this.subscribeLiveLocationSharingUpdates()); + this.unsubscribeFunctions.add(this.subscribeTargetMessagesChange()); + // TODO? - handle message registration during message updates too, message updated eol added (I hope not) + }; +} diff --git a/src/types.ts b/src/types.ts index 0a7b7e3841..95ad267b0c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -2145,6 +2145,10 @@ export type Attachment< author_name?: string; color?: string; duration?: number; + /** + * Location-related, should be an ISO timestamp if type of the attachment is `live_location` + */ + end_time?: string; fallback?: string; fields?: Field[]; file_size?: number | string; @@ -2159,6 +2163,10 @@ export type Attachment< original_height?: number; original_width?: number; pretext?: string; + /** + * Location-related, true when user forcibly stops live location sharing + */ + stopped_sharing?: boolean; text?: string; thumb_url?: string; title?: string;