diff --git a/frontend/src/apis/client.ts b/frontend/src/apis/client.ts index a12c44a86..24115c25c 100644 --- a/frontend/src/apis/client.ts +++ b/frontend/src/apis/client.ts @@ -120,8 +120,8 @@ class APIClient { } } - async get(endpoint: string): Promise { - return this.request(endpoint, { method: 'GET' }) + async get(endpoint: string, options?: RequestInit): Promise { + return this.request(endpoint, { method: 'GET', ...options }) } async post(endpoint: string, data?: unknown): Promise { diff --git a/frontend/src/apis/tasks.ts b/frontend/src/apis/tasks.ts index 8461dd255..7c61e3bc0 100644 --- a/frontend/src/apis/tasks.ts +++ b/frontend/src/apis/tasks.ts @@ -286,8 +286,8 @@ export const taskApis = { return apiClient.put(`/tasks/${id}`, data) }, - getTaskDetail: async (id: number): Promise => { - return apiClient.get(`/tasks/${id}`) + getTaskDetail: async (id: number, signal?: AbortSignal): Promise => { + return apiClient.get(`/tasks/${id}`, signal ? { signal } : undefined) }, // Send a message. If task_id not provided, create task first, then send. diff --git a/frontend/src/app/(tasks)/chat/error.tsx b/frontend/src/app/(tasks)/chat/error.tsx new file mode 100644 index 000000000..18a952521 --- /dev/null +++ b/frontend/src/app/(tasks)/chat/error.tsx @@ -0,0 +1,39 @@ +// SPDX-FileCopyrightText: 2025 Weibo, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +'use client' + +import { useEffect } from 'react' +import { useTranslation } from '@/hooks/useTranslation' +import { Button } from '@/components/ui/button' + +/** + * Next.js error boundary for the chat route. + * Catches rendering errors in ChatPageDesktop/ChatPageMobile and shows + * a recovery UI instead of a blank white screen. + */ +export default function ChatError({ + error, + reset, +}: { + error: Error & { digest?: string } + reset: () => void +}) { + const { t } = useTranslation('common') + + useEffect(() => { + console.error('[ChatError] Caught rendering error:', error) + }, [error]) + + return ( +
+
+

{t('errors.request_failed')}

+ +
+
+ ) +} diff --git a/frontend/src/app/(tasks)/chat/page.tsx b/frontend/src/app/(tasks)/chat/page.tsx index 96ae72f83..a97547422 100644 --- a/frontend/src/app/(tasks)/chat/page.tsx +++ b/frontend/src/app/(tasks)/chat/page.tsx @@ -19,11 +19,21 @@ import { useIsMobile } from '@/features/layout/hooks/useMediaQuery' import { useUser } from '@/features/common/UserContext' import { useTaskContext } from '@/features/tasks/contexts/taskContext' +// Loading skeleton for dynamic chat page imports +function ChatPageSkeleton() { + return ( +
+
+
+ ) +} + // Dynamic imports for mobile and desktop page components with code splitting const ChatPageDesktop = dynamic( () => import('./ChatPageDesktop').then(mod => ({ default: mod.ChatPageDesktop })), { ssr: false, + loading: () => , } ) @@ -31,6 +41,7 @@ const ChatPageMobile = dynamic( () => import('./ChatPageMobile').then(mod => ({ default: mod.ChatPageMobile })), { ssr: false, + loading: () => , } ) diff --git a/frontend/src/features/tasks/components/chat/ChatArea.tsx b/frontend/src/features/tasks/components/chat/ChatArea.tsx index 0498b3522..7e2946c13 100644 --- a/frontend/src/features/tasks/components/chat/ChatArea.tsx +++ b/frontend/src/features/tasks/components/chat/ChatArea.tsx @@ -25,6 +25,7 @@ import { useTranslation } from '@/hooks/useTranslation' import { useRouter } from 'next/navigation' import { useTaskContext } from '../../contexts/taskContext' import { useTaskStateMachine } from '../../hooks/useTaskStateMachine' +import { useHasMessages } from '../../hooks/useHasMessages' import { Button } from '@/components/ui/button' import { useScrollManagement } from '../hooks/useScrollManagement' import { useFloatingInput } from '../hooks/useFloatingInput' @@ -97,7 +98,8 @@ function ChatAreaContent({ const { quote, clearQuote, formatQuoteForMessage } = useQuote() // Task context - const { selectedTaskDetail, setSelectedTask, accessDenied } = useTaskContext() + const { selectedTask, selectedTaskDetail, setSelectedTask, accessDenied, clearAccessDenied } = + useTaskContext() // Use useTaskStateMachine hook for reactive state updates (SINGLE SOURCE OF TRUTH per AGENTS.md) const { state: taskState } = useTaskStateMachine(selectedTaskDetail?.id) @@ -499,37 +501,13 @@ function ChatAreaContent({ teams: [...filteredTeams, ...teams], }) - // Determine if there are messages to display (full computation) - // Note: Now using taskState.messages from state machine instead of selectedTaskDetail.subtasks - const hasMessages = useMemo(() => { - const hasSelectedTask = selectedTaskDetail && selectedTaskDetail.id - const hasNewTaskStream = - !selectedTaskDetail?.id && streamHandlers.pendingTaskId && streamHandlers.isStreaming - const hasLocalPending = streamHandlers.localPendingMessage !== null - // Use taskState from state machine (single source of truth) - const hasUnifiedMessages = taskState?.messages && taskState.messages.size > 0 - - // If we have a selected task with messages in state machine, show messages - if (hasSelectedTask && hasUnifiedMessages) { - return true - } - - return Boolean( - hasSelectedTask || - streamHandlers.hasPendingUserMessage || - streamHandlers.isStreaming || - hasNewTaskStream || - hasLocalPending || - hasUnifiedMessages - ) - }, [ + // Determine if there are messages to display + const hasMessages = useHasMessages({ + selectedTask, selectedTaskDetail, - streamHandlers.hasPendingUserMessage, - streamHandlers.isStreaming, - streamHandlers.pendingTaskId, - streamHandlers.localPendingMessage, - taskState?.messages, - ]) + taskState, + streamHandlers, + }) // Note: Team selection is now handled by useTeamSelection hook in TeamSelector component // Model selection is handled by useModelSelection hook in ModelSelector component diff --git a/frontend/src/features/tasks/components/message/MessageBubble.tsx b/frontend/src/features/tasks/components/message/MessageBubble.tsx index 4d326be34..7e3936b20 100644 --- a/frontend/src/features/tasks/components/message/MessageBubble.tsx +++ b/frontend/src/features/tasks/components/message/MessageBubble.tsx @@ -45,6 +45,7 @@ import { GeminiAnnotations } from '../chat/GeminiAnnotations' import CollapsibleMessage from './CollapsibleMessage' import { processCitePatterns } from '../../utils/processCitePatterns' import RegenerateModelPopover from './RegenerateModelPopover' +import { useCopyCleanup } from '../../hooks/useCopyCleanup' import VideoConfigBadge from './VideoConfigBadge' import type { ClarificationData, FinalPromptData, ClarificationAnswer } from '@/types/api' import type { SourceReference, GeminiAnnotation } from '@/types/socket' @@ -1327,6 +1328,9 @@ const MessageBubble = memo( } } + // Handle copy event to clean up extra newlines from DOM structure + const handleCopy = useCopyCleanup() + // When editing, expand to full width for better editing experience const containerWidthClass = isEditing ? 'w-full' @@ -1348,6 +1352,7 @@ const MessageBubble = memo(
{/* Show header for AI messages */} diff --git a/frontend/src/features/tasks/contexts/taskContext.tsx b/frontend/src/features/tasks/contexts/taskContext.tsx index a7c9a24f5..295c9d0eb 100644 --- a/frontend/src/features/tasks/contexts/taskContext.tsx +++ b/frontend/src/features/tasks/contexts/taskContext.tsx @@ -94,6 +94,9 @@ export const TaskContextProvider = ({ children }: { children: ReactNode }) => { // Track task status for notification const taskStatusMapRef = useRef>(new Map()) + // AbortController for cancelling stale getTaskDetail requests during rapid task switching + const abortControllerRef = useRef(null) + // WebSocket connection for real-time task updates const { registerTaskHandlers, isConnected, leaveTask, joinTask, onReconnect } = useSocket() @@ -688,18 +691,27 @@ export const TaskContextProvider = ({ children }: { children: ReactNode }) => { return } + // Abort previous in-flight request to prevent stale responses overwriting current data + if (abortControllerRef.current) { + abortControllerRef.current.abort() + } + const controller = new AbortController() + abortControllerRef.current = controller + try { // Clear access denied state before fetching setAccessDenied(false) // Fetch task metadata only (subtasks are now obtained via WebSocket task:join) - const updatedTaskDetail = await taskApis.getTaskDetail(selectedTask.id) + const updatedTaskDetail = await taskApis.getTaskDetail(selectedTask.id, controller.signal) - // Note: Workbench data extraction from subtasks is no longer needed here - // Subtasks are now managed by TaskStateMachine via WebSocket join response - // Workbench data should be obtained from the state machine or WebSocket events + // Verify the selected task hasn't changed while the request was in flight + if (controller.signal.aborted) return setSelectedTaskDetail(updatedTaskDetail) } catch (error) { + // Ignore abort errors - they are expected when switching tasks rapidly + if (error instanceof DOMException && error.name === 'AbortError') return + // Check if it's a 403 Forbidden or 404 Not Found error (access denied or task not found) // Both cases should show the access denied UI to prevent information leakage if (error instanceof ApiError && (error.status === 403 || error.status === 404)) { @@ -722,6 +734,8 @@ export const TaskContextProvider = ({ children }: { children: ReactNode }) => { // Leave previous task room if switching to a different task if (previousTaskId !== null && previousTaskId !== currentTaskId) { leaveTask(previousTaskId) + // Clear stale detail immediately so downstream hooks don't operate on wrong task data + setSelectedTaskDetail(null) } // Update the ref to track current task diff --git a/frontend/src/features/tasks/hooks/useCopyCleanup.ts b/frontend/src/features/tasks/hooks/useCopyCleanup.ts new file mode 100644 index 000000000..2ebfe1372 --- /dev/null +++ b/frontend/src/features/tasks/hooks/useCopyCleanup.ts @@ -0,0 +1,31 @@ +// SPDX-FileCopyrightText: 2025 Weibo, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import { useCallback } from 'react' + +/** + * Cleans up selected text by: + * - Replacing 2+ consecutive newlines with a single newline + * - Trimming leading/trailing whitespace + */ +export function cleanCopyText(text: string): string { + return text.replace(/\n{2,}/g, '\n').trim() +} + +/** + * Hook to handle copy events with text cleanup. + * Fixes the issue where block-level elements create extra newlines when copying. + */ +export function useCopyCleanup() { + const handleCopy = useCallback((e: React.ClipboardEvent) => { + const selection = window.getSelection() + if (!selection || selection.rangeCount === 0) return + + const text = cleanCopyText(selection.toString()) + e.clipboardData.setData('text/plain', text) + e.preventDefault() + }, []) + + return handleCopy +} diff --git a/frontend/src/features/tasks/hooks/useHasMessages.ts b/frontend/src/features/tasks/hooks/useHasMessages.ts new file mode 100644 index 000000000..9db842ddb --- /dev/null +++ b/frontend/src/features/tasks/hooks/useHasMessages.ts @@ -0,0 +1,71 @@ +// SPDX-FileCopyrightText: 2025 Weibo, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import { useMemo } from 'react' +import type { TaskDetail } from '../../../types/api' +import type { TaskStateData } from '../state' + +interface StreamHandlersState { + hasPendingUserMessage: boolean + isStreaming: boolean + pendingTaskId: number | null + localPendingMessage: unknown | null +} + +interface UseHasMessagesParams { + selectedTask: { id: number } | null + selectedTaskDetail: TaskDetail | null + taskState: TaskStateData | null + streamHandlers: StreamHandlersState +} + +/** + * Determines if there are messages to display in the chat area. + * + * Logic breakdown: + * - hasSelectedTask: Has loaded task detail with messages + * - isLoadingTask: Task is selected but details are still loading (prevents flash) + * - hasNewTaskStream: New task being created with streaming + * - hasLocalPending: Local pending message waiting to be sent + * - hasUnifiedMessages: Messages exist in state machine + */ +export function useHasMessages({ + selectedTask, + selectedTaskDetail, + taskState, + streamHandlers, +}: UseHasMessagesParams): boolean { + return useMemo(() => { + const hasSelectedTask = selectedTaskDetail?.id != null + const isLoadingTask = selectedTask != null && selectedTaskDetail == null + const hasNewTaskStream = + !selectedTaskDetail?.id && streamHandlers.pendingTaskId != null && streamHandlers.isStreaming + const hasLocalPending = streamHandlers.localPendingMessage != null + const hasUnifiedMessages = taskState?.messages != null && taskState.messages.size > 0 + + // Fast path: task with messages loaded + if (hasSelectedTask && hasUnifiedMessages) { + return true + } + + // Check any condition that indicates chat should be shown + return ( + hasSelectedTask || + isLoadingTask || + streamHandlers.hasPendingUserMessage || + streamHandlers.isStreaming || + hasNewTaskStream || + hasLocalPending || + hasUnifiedMessages + ) + }, [ + selectedTask, + selectedTaskDetail, + taskState?.messages, + streamHandlers.hasPendingUserMessage, + streamHandlers.isStreaming, + streamHandlers.pendingTaskId, + streamHandlers.localPendingMessage, + ]) +} diff --git a/frontend/src/features/tasks/hooks/useTaskStateMachine.ts b/frontend/src/features/tasks/hooks/useTaskStateMachine.ts index afce308a4..4b190fc49 100644 --- a/frontend/src/features/tasks/hooks/useTaskStateMachine.ts +++ b/frontend/src/features/tasks/hooks/useTaskStateMachine.ts @@ -9,7 +9,7 @@ * Handles subscription to state changes and provides convenient accessors. */ -import { useState, useEffect, useCallback, useMemo } from 'react' +import { useState, useEffect, useCallback, useMemo, useSyncExternalStore } from 'react' import { taskStateManager, TaskStateData, UnifiedMessage, SyncOptions } from '../state' export interface UseTaskStateMachineResult { @@ -36,8 +36,13 @@ export function useTaskStateMachine( ): UseTaskStateMachineResult { const [state, setState] = useState(null) - // Check if manager is initialized - const isInitialized = taskStateManager.isInitialized() + // Reactively track manager initialization via useSyncExternalStore + const isInitialized = useSyncExternalStore( + taskStateManager.subscribeInit, + taskStateManager.getInitialized, + // SSR snapshot: always false on server + () => false + ) // Subscribe to state changes // Subscribe to state changes diff --git a/frontend/src/features/tasks/hooks/useUnifiedMessages.ts b/frontend/src/features/tasks/hooks/useUnifiedMessages.ts index 28d3ed2ab..a6b675c2b 100644 --- a/frontend/src/features/tasks/hooks/useUnifiedMessages.ts +++ b/frontend/src/features/tasks/hooks/useUnifiedMessages.ts @@ -22,7 +22,7 @@ * 5. chat:done -> Update AI message status to 'completed' */ -import { useMemo, useEffect } from 'react' +import { useMemo, useEffect, useRef, useCallback } from 'react' import { useTaskStateMachine } from './useTaskStateMachine' import { useUser } from '@/features/common/UserContext' import { useTaskContext } from '../contexts/taskContext' @@ -218,16 +218,37 @@ export function useUnifiedMessages({ isInitialized, } = useTaskStateMachine(effectiveTaskId, syncOptions) + // Track which tasks have had their initial recovery to avoid skipping + // recovery when chat:start races ahead and sets isStreaming=true before recover() runs + const recoveredTasksRef = useRef>(new Set()) + + // Cleanup old recovered task IDs to prevent unbounded growth + // Keep only the most recent MAX_RECOVERED_TASKS entries + const cleanupRecoveredTasks = useCallback(() => { + const MAX_RECOVERED_TASKS = 50 + const tasks = Array.from(recoveredTasksRef.current) + if (tasks.length > MAX_RECOVERED_TASKS) { + const toRemove = tasks.slice(0, tasks.length - MAX_RECOVERED_TASKS) + toRemove.forEach(id => recoveredTasksRef.current.delete(id)) + } + }, []) + // Trigger recovery when task changes // Subtasks are now fetched from joinTask response, not passed as parameter - // IMPORTANT: Do NOT recover if already streaming - this would interrupt the stream + // IMPORTANT: Always perform initial recovery per task (even if already streaming), + // because after page refresh chat:start can set isStreaming before cached_content is loaded. + // After initial recovery, skip re-recovery when streaming to avoid interruption. useEffect(() => { if (!effectiveTaskId || !isInitialized) return // Only recover for positive task IDs (real tasks, not pending) - // Skip recovery if already streaming to avoid interrupting active streams - if (effectiveTaskId > 0 && !isStreaming) { - recover() + if (effectiveTaskId > 0) { + const needsInitialRecovery = !recoveredTasksRef.current.has(effectiveTaskId) + if (needsInitialRecovery || !isStreaming) { + recoveredTasksRef.current.add(effectiveTaskId) + cleanupRecoveredTasks() + recover() + } } }, [effectiveTaskId, isInitialized, recover, isStreaming]) diff --git a/frontend/src/features/tasks/state/TaskStateMachine.ts b/frontend/src/features/tasks/state/TaskStateMachine.ts index 2a3dd3aca..1c1a1cdb6 100644 --- a/frontend/src/features/tasks/state/TaskStateMachine.ts +++ b/frontend/src/features/tasks/state/TaskStateMachine.ts @@ -219,6 +219,12 @@ export class TaskStateMachine { // Queue for chunk events received during syncing state // These will be applied after sync completes private pendingChunks: PendingChunkEvent[] = [] + // Queue for chat:start events received before recovery completes + // Prevents premature transition to streaming before cached_content is loaded + private pendingStarts: Array<{ subtaskId: number; shellType?: string; messageId?: number }> = [] + + // States where events should be queued instead of processed immediately + private static readonly BUFFERING_STATES: readonly TaskStatus[] = ['idle', 'joining', 'syncing'] constructor(taskId: number, deps: TaskStateMachineDeps) { this.state = { @@ -240,6 +246,35 @@ export class TaskStateMachine { return { ...this.state, messages: new Map(this.state.messages) } } + /** + * Check if current state is buffering (events should be queued) + */ + private isBufferingState(): boolean { + return TaskStateMachine.BUFFERING_STATES.includes(this.state.status) + } + + /** + * Factory method to create a new AI streaming message + */ + private createAiMessage(params: { + subtaskId: number + messageId?: number + shellType?: string + content?: string + }): UnifiedMessage { + const { subtaskId, messageId, shellType, content = '' } = params + return { + id: generateMessageId('ai', subtaskId), + type: 'ai', + status: 'streaming', + content, + timestamp: Date.now(), + subtaskId, + messageId, + result: shellType ? { shell_type: shellType } : undefined, + } + } + /** * Subscribe to state changes */ @@ -481,6 +516,7 @@ export class TaskStateMachine { } this.pendingRecovery = false this.pendingChunks = [] // Clear pending chunks queue on leave + this.pendingStarts = [] // Clear pending start events on leave break } @@ -619,6 +655,45 @@ export class TaskStateMachine { } } + // Sync result.blocks with recovered content for streaming messages. + // After recovery, message.content has the cached content (from Redis) + // but result.blocks may be empty or stale (from DB). + // The UI (MixedContentView) renders from result.blocks when available, + // so blocks must include the recovered content. Without this, new chunks + // create text blocks with only new content and the cached content is lost. + if (streamingInfo && streamingInfo.subtask_id) { + const recoveredMsg = this.state.messages.get( + generateMessageId('ai', streamingInfo.subtask_id) + ) + if (recoveredMsg && recoveredMsg.status === 'streaming' && recoveredMsg.content) { + const existingBlocks = recoveredMsg.result?.blocks || [] + const totalTextInBlocks = existingBlocks + .filter(b => b.type === 'text') + .reduce((total, b) => total + (b.content?.length || 0), 0) + + if (totalTextInBlocks < recoveredMsg.content.length) { + const nonTextBlocks = existingBlocks.filter(b => b.type !== 'text') + const recoveredBlocks: MessageBlock[] = [ + ...nonTextBlocks, + { + id: `text-recovered-${streamingInfo.subtask_id}`, + type: 'text', + content: recoveredMsg.content, + status: 'streaming', + timestamp: Date.now(), + }, + ] + + const updatedMessages = new Map(this.state.messages) + updatedMessages.set(generateMessageId('ai', streamingInfo.subtask_id), { + ...recoveredMsg, + result: { ...recoveredMsg.result, blocks: recoveredBlocks }, + }) + this.state = { ...this.state, messages: updatedMessages } + } + } + } + // Check if any message is streaming let streamingSubtaskId: number | null = null for (const msg of this.state.messages.values()) { @@ -634,8 +709,9 @@ export class TaskStateMachine { await this.dispatch({ type: 'SYNC_DONE' }) } - // Apply pending chunks that were queued during sync - // This ensures chunks received during joining/syncing are not lost + // Apply pending start events and chunks that were queued during sync + // This ensures events received during joining/syncing are not lost + this.applyPendingStarts() this.applyPendingChunks() } catch (error) { const errorMsg = error instanceof Error ? error.message : 'Sync failed' @@ -643,6 +719,27 @@ export class TaskStateMachine { } } + /** + * Apply pending chat:start events that were queued during idle/joining/syncing. + * Creates AI messages for subtasks that started while recovery was in progress. + * Only creates a message if one doesn't already exist (buildMessages may have created it). + */ + private applyPendingStarts(): void { + if (this.pendingStarts.length === 0) return + + const newMessages = new Map(this.state.messages) + for (const start of this.pendingStarts) { + const aiMessageId = generateMessageId('ai', start.subtaskId) + if (newMessages.has(aiMessageId)) { + // Message already exists from buildMessages/cached_content, skip + continue + } + newMessages.set(aiMessageId, this.createAiMessage(start)) + } + this.state = { ...this.state, messages: newMessages } + this.pendingStarts = [] + } + /** * Apply pending chunk events that were queued during sync */ @@ -665,8 +762,8 @@ export class TaskStateMachine { } // Apply the chunk to the message - // CRITICAL FIX: Always append content to message.content, regardless of block_id - // This ensures that cached_content + new chunks are all in message.content + // Pending chunks received during joining/syncing are NEW content generated + // AFTER cached_content was captured, so they should always be appended const newMessages = new Map(this.state.messages) const updatedMessage: UnifiedMessage = { ...existingMessage, @@ -681,7 +778,6 @@ export class TaskStateMachine { updatedMessage.reasoningContent = chunk.result.reasoning_content } - // Handle blocks // Handle blocks // CRITICAL: Always call mergeBlocksFromPendingChunk and update result.blocks // This ensures text blocks are created for ClaudeCode executor @@ -701,7 +797,6 @@ export class TaskStateMachine { } else { // CRITICAL FIX: Always update blocks even when no chunk.result // This handles ClaudeCode executor which sends text content without block_id - // The mergeBlocksFromPendingChunk method will create text blocks to maintain chronological order updatedMessage.result = { ...existingMessage.result, blocks: mergedBlocks, @@ -991,20 +1086,28 @@ export class TaskStateMachine { * Handle CHAT_START event */ private handleChatStartEvent(event: Extract): void { - const aiMessageId = generateMessageId('ai', event.subtaskId) - const initialResult = event.shellType ? { shell_type: event.shellType } : undefined + // Queue chat:start in buffering states to prevent premature streaming transition + // before recover() fetches cached_content. Without this guard, cached_content is lost + // because isStreaming becomes true and blocks the recovery useEffect. + if (this.isBufferingState()) { + this.pendingStarts.push({ + subtaskId: event.subtaskId, + shellType: event.shellType, + messageId: event.messageId, + }) + return + } + const aiMessageId = generateMessageId('ai', event.subtaskId) const newMessages = new Map(this.state.messages) - newMessages.set(aiMessageId, { - id: aiMessageId, - type: 'ai', - status: 'streaming', - content: '', - timestamp: Date.now(), - subtaskId: event.subtaskId, - messageId: event.messageId, // Set messageId from chat:start event for proper ordering - result: initialResult, - }) + newMessages.set( + aiMessageId, + this.createAiMessage({ + subtaskId: event.subtaskId, + messageId: event.messageId, + shellType: event.shellType, + }) + ) this.state = { ...this.state, @@ -1034,15 +1137,11 @@ export class TaskStateMachine { private handleChatChunkEvent(event: Extract): void { const aiMessageId = generateMessageId('ai', event.subtaskId) - // If in idle/joining/syncing state, queue the chunk for later processing + // If in buffering state, queue the chunk for later processing // This handles the race condition where chunks arrive before sync completes // CRITICAL FIX: Also queue chunks when in 'idle' state, because after page refresh, // WebSocket reconnects and starts receiving chunks before recover() is called - if ( - this.state.status === 'idle' || - this.state.status === 'joining' || - this.state.status === 'syncing' - ) { + if (this.isBufferingState()) { this.pendingChunks.push({ subtaskId: event.subtaskId, content: event.content, diff --git a/frontend/src/features/tasks/state/TaskStateManager.ts b/frontend/src/features/tasks/state/TaskStateManager.ts index 237ae53ba..5ac7a17cd 100644 --- a/frontend/src/features/tasks/state/TaskStateManager.ts +++ b/frontend/src/features/tasks/state/TaskStateManager.ts @@ -24,6 +24,7 @@ class TaskStateManagerImpl { private machines: Map = new Map() private deps: TaskStateMachineDeps | null = null private globalListeners: Set<(taskId: number, state: TaskStateData) => void> = new Set() + private initListeners: Set<() => void> = new Set() /** * Initialize the manager with dependencies from SocketContext @@ -31,6 +32,8 @@ class TaskStateManagerImpl { */ initialize(deps: TaskStateMachineDeps): void { this.deps = deps + // Notify init listeners so useSyncExternalStore picks up the change + this.initListeners.forEach(listener => listener()) } /** @@ -40,6 +43,23 @@ class TaskStateManagerImpl { return this.deps !== null } + /** + * Subscribe to initialization state changes (for useSyncExternalStore) + */ + subscribeInit(listener: () => void): () => void { + this.initListeners.add(listener) + return () => { + this.initListeners.delete(listener) + } + } + + /** + * Get initialization state snapshot (for useSyncExternalStore) + */ + getInitialized(): boolean { + return this.deps !== null + } + /** * Get or create a TaskStateMachine for a task */