Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion packages/app/src/commands/makeConnectionCommand.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { type NodeConnection, type NodeId, type PortId } from '@ironclad/rivet-core';
import { useCommand } from './Command';
import { useSetAtom } from 'jotai';
import { connectionsState } from '../state/graph';
import { connectionsState, nodesState } from '../state/graph';
import { toast } from 'react-toastify';

export function useMakeConnectionCommand() {
const setConnections = useSetAtom(connectionsState);
const setNodes = useSetAtom(nodesState);

return useCommand<
{
Expand Down Expand Up @@ -37,6 +39,16 @@ export function useMakeConnectionCommand() {
outputId: params.outputId,
};

// If the output node was marked async, turn async off because it now has an outgoing connection
const outputNode = currentState.nodes.find((n) => n.id === params.outputNodeId);
if (outputNode?.isAsync) {
setNodes((prev) =>
prev.map((n) => (n.id === outputNode.id ? ({ ...n, isAsync: false } as typeof n) : n)),
);

toast.info(`Async was turned off for "${outputNode.title}" node because you added an outgoing connection.`);
}

setConnections([...connections, newConnection]);

return {
Expand Down
55 changes: 54 additions & 1 deletion packages/app/src/components/NodeEditor.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type FC, useMemo, useState, type MouseEvent } from 'react';
import { editingNodeState } from '../state/graphBuilder.js';
import { nodesByIdState } from '../state/graph.js';
import { nodesByIdState, connectionsForNodeState } from '../state/graph.js';
import styled from '@emotion/styled';
import MultiplyIcon from 'majesticons/line/multiply-line.svg?react';
import {
Expand Down Expand Up @@ -31,6 +31,27 @@ import { NodeColorPicker } from './NodeColorPicker';
import { Tooltip } from './Tooltip';
import { useAtomValue, useAtom, useSetAtom } from 'jotai';
import { useEditNodeCommand } from '../commands/editNodeCommand';
import { toast } from 'react-toastify';

// Node types that should NOT expose the "Async" option in the editor
const ASYNC_UNSUPPORTED_NODE_TYPES = new Set<string>([
'graphInput',
'graphOutput',
'context',
'raiseEvent',
'waitForEvent',
'passthrough',
'raceInputs',
'setGlobal',
'coalesce',
'compare',
'delay',
'if',
'ifElse',
'loopController',
'loopUntil',
'match',
]);

export const NodeEditorRenderer: FC = () => {
const nodesById = useAtomValue(nodesByIdState);
Expand Down Expand Up @@ -334,6 +355,12 @@ export const NodeEditor: FC<NodeEditorProps> = ({ selectedNode, onDeselect }) =>
const selectedVariantOption =
selectedVariant === undefined ? variantOptions[0] : variantOptions.find(({ value }) => value === selectedVariant);

// Check if this node currently has outgoing connections
const connectionsByNode = useAtomValue(connectionsForNodeState);
const hasOutgoingConnections = (connectionsByNode[selectedNode.id] ?? []).some(
(conn) => conn.outputNodeId === selectedNode.id,
);

function handleSaveAsVariant(id: string) {
const node = { ...selectedNode, variants: [...(selectedNode.variants ?? []), { id, data: selectedNode.data }] };
updateNode(node);
Expand Down Expand Up @@ -555,6 +582,32 @@ export const NodeEditor: FC<NodeEditorProps> = ({ selectedNode, onDeselect }) =>
</section>
)}
</Field>
{!ASYNC_UNSUPPORTED_NODE_TYPES.has(selectedNode.type) && (
<Field name="async" label="Async Node">
{({ fieldProps }) => (
<section className="split-controls">
<div className="split-controls-toggle">
<Tooltip content="The graph will not wait for this node to finish. Only works for nodes with no outgoing connections.">
<Toggle
{...fieldProps}
isChecked={!!selectedNode.isAsync}
onChange={(e) => {
const isNewStateChecked = e.target.checked;
if (isNewStateChecked && hasOutgoingConnections) {
toast.info(
'Cannot enable Async on a node with outgoing connections.',
);
return;
}
updateNode({ ...selectedNode, isAsync: isNewStateChecked });
}}
/>
</Tooltip>
</div>
</section>
)}
</Field>
)}
</div>
)}

Expand Down
3 changes: 3 additions & 0 deletions packages/app/src/components/VisualNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ export const VisualNode = memo(
isSplit: node.isSplitRun,
disabled: node.disabled,
conditional: !!node.isConditional,
async: !!node.isAsync,
},
changedClass,
)}
Expand Down Expand Up @@ -389,6 +390,7 @@ const ZoomedOutVisualNodeContent: FC<{
{!isReallyZoomedOut && (
<div className="grab-area" {...handleAttributes} onClick={handleGrabClick}>
{node.isSplitRun ? <GitForkLine /> : <></>}
{node.isAsync ? <span>ASYNC</span> : <></>}
<div className="title-text">{node.title}</div>
</div>
)}
Expand Down Expand Up @@ -682,6 +684,7 @@ const NormalVisualNodeContent: FC<{
onClick={handleGrabClick}
>
{node.isSplitRun ? <GitForkLine /> : <></>}
{node.isAsync ? <span>ASYNC</span> : <></>}
<div className="title-text">{node.title}</div>
</div>
<div className="title-controls">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export const AiAssistEditorBase = <TNodeData, TOutputs>({
model: model!,
api: api!,
},
registry,
registry: registry as unknown as NodeRegistration,
...(await fillMissingSettingsFromEnvironmentVariables(settings, plugins)),
});

Expand Down
2 changes: 1 addition & 1 deletion packages/app/src/hooks/useAiGraphBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ export function useAiGraphBuilder({ record, onFeedback }: { record: boolean; onF
onUserEvent,
nativeApi: new TauriNativeApi(),
datasetProvider: new InMemoryDatasetProvider(data),
registry,
registry: registry as unknown as NodeRegistration,
...(await fillMissingSettingsFromEnvironmentVariables(settings, plugins)),
});

Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/integrations/CodeRunner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Inputs, Outputs } from '../index.js';
import type { DataValue } from '../model/DataValue.js';
import type { InternalProcessContext } from '../model/ProcessContext.js';

// eslint-disable-next-line import/no-cycle -- There has to be a cycle if we're to import the entirety of Rivet here.
import * as Rivet from '../exports.js';
Expand All @@ -8,6 +9,7 @@ export interface CodeRunnerOptions {
includeRequire: boolean;
includeFetch: boolean;
includeRivet: boolean;
includeInternalProcessContext: boolean;
includeProcess: boolean;
includeConsole: boolean;
}
Expand All @@ -18,6 +20,7 @@ export interface CodeRunner {
code: string,
inputs: Inputs,
options: CodeRunnerOptions,
context: InternalProcessContext,
graphInputs?: Record<string, DataValue>,
contextValues?: Record<string, DataValue>
) => Promise<Outputs>;
Expand All @@ -28,6 +31,7 @@ export class IsomorphicCodeRunner implements CodeRunner {
code: string,
inputs: Inputs,
options: CodeRunnerOptions,
context: InternalProcessContext,
graphInputs?: Record<string, DataValue>,
contextValues?: Record<string, DataValue>
): Promise<Outputs> {
Expand Down Expand Up @@ -57,6 +61,11 @@ export class IsomorphicCodeRunner implements CodeRunner {
args.push(Rivet);
}

if (options.includeInternalProcessContext) {
argNames.push('internalProcessContext');
args.push(context);
}

if (graphInputs) {
argNames.push('graphInputs');
args.push(graphInputs);
Expand All @@ -82,6 +91,7 @@ export class NotAllowedCodeRunner implements CodeRunner {
_code: string,
_inputs: Inputs,
_options: CodeRunnerOptions,
_context: InternalProcessContext,
_graphInputs?: Record<string, DataValue>,
_contextValues?: Record<string, DataValue>
): Promise<Outputs> {
Expand Down
60 changes: 50 additions & 10 deletions packages/core/src/model/GraphProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,11 @@ export class GraphProcessor {
await this.#processNodeIfAllInputsAvailable(node);
}

/** If all inputs are present, all conditions met, processes the node. */
async #processNodeIfAllInputsAvailable(node: ChartNode): Promise<void> {
/** If all inputs are present, all conditions met, processes the node. For "Async" mode, set detached to true. */
async #processNodeIfAllInputsAvailable(node: ChartNode, detached: boolean = false): Promise<void> {
this.#emitTraceEvent(
`Entering #processNodeIfAllInputsAvailable for ${node.title} (${node.id}), detached=${detached}`,
);
const builtInNode = node as BuiltInNodes;

if (this.#ignoreNodes.has(node.id)) {
Expand Down Expand Up @@ -1112,6 +1115,41 @@ export class GraphProcessor {
return;
}

// If this node is marked as async, run it as a fire-and-forget task and do not
// block the processing queue or propagate to downstream nodes. Only applicable
// to terminal nodes (no output connections) that are not graphOutput nodes.
// Do not re-trigger if already running in a detached mode.
if (node.isAsync && !detached) {
const hasOutgoing = this.#outputNodesFrom(node).nodes.length > 0;
const isGraphOutput = node.type === ('graphOutput' as BuiltInNodeType);
this.#emitTraceEvent(
`Async candidate ${node.title} (${node.id}): hasOutgoing=${hasOutgoing}, isGraphOutput=${isGraphOutput}`,
);

if (!hasOutgoing && !isGraphOutput) {
this.#emitTraceEvent(
`Starting async (fire-and-forget) node ${node.title} (${node.id}). Downstream nodes will not be scheduled from this node.`,
);

// Kick off processing without awaiting and without adding to the processing queue.
// Use the detached pathway to process the node and avoid scheduling downstream nodes.
// eslint-disable-next-line @typescript-eslint/no-floating-promises
(async () => {
try {
await this.#processNodeIfAllInputsAvailable(node, true);
} catch {
// Errors are handled in #processNode via #nodeErrored
}
})();

return;
} else {
this.#emitTraceEvent(
`Node ${node.title} (${node.id}) is marked async but has downstream connections or is a graph output. Running normally.`,
);
}
}

this.#currentlyProcessing.add(node.id);

if (node.type === 'loopController') {
Expand Down Expand Up @@ -1239,14 +1277,16 @@ export class GraphProcessor {
}

// Node is finished, check if we can run any more nodes that depend on this one
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.#processingQueue.addAll(
outputNodes.nodes.map((outputNode) => async () => {
this.#emitTraceEvent(`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`);
if (!detached) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.#processingQueue.addAll(
outputNodes.nodes.map((outputNode) => async () => {
this.#emitTraceEvent(`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`);

await this.#processNodeIfAllInputsAvailable(outputNode);
}),
);
await this.#processNodeIfAllInputsAvailable(outputNode);
}),
);
}
}

#getAttachedDataTo(node: ChartNode | NodeId): AttachedNodeData {
Expand Down Expand Up @@ -1530,7 +1570,7 @@ export class GraphProcessor {
this.getRootProcessor().raiseEvent(event, data as DataValue);
},
contextValues: this.#contextValues,
externalFunctions: { ...this.#externalFunctions },
externalFunctions: this.#externalFunctions,
onPartialOutputs: (partialOutputs) => {
partialOutput?.(node, partialOutputs, index);

Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/model/NodeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ export interface NodeBase {

/** If true, the node exposes an `if` port that lets it run conditionally. */
isConditional?: boolean;

/**
* If true, the node will be executed asynchronously.
* The graph runner will not wait for this node to complete before finishing the graph
* Only works for terminal (no outgoing connections) nodes.
*/
isAsync?: boolean;
}

/** Base type for a typed node. */
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/model/nodes/CodeNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type CodeNodeData = {
allowFetch?: boolean;
allowRequire?: boolean;
allowRivet?: boolean;
allowInternalProcessContext?: boolean;
allowProcess?: boolean;
allowConsole?: boolean;
};
Expand Down Expand Up @@ -58,6 +59,7 @@ export class CodeNodeImpl extends NodeImpl<CodeNode> {
allowFetch: false,
allowRequire: false,
allowRivet: false,
allowInternalProcessContext: false,
allowProcess: false,
allowConsole: false,
},
Expand Down Expand Up @@ -139,6 +141,11 @@ export class CodeNodeImpl extends NodeImpl<CodeNode> {
label: 'Allow using `Rivet`',
dataKey: 'allowRivet',
},
{
type: 'toggle',
label: 'Allow using `internalProcessContext`',
dataKey: 'allowInternalProcessContext',
},
{
type: 'toggle',
label: 'Allow using `process`',
Expand Down Expand Up @@ -189,9 +196,11 @@ export class CodeNodeImpl extends NodeImpl<CodeNode> {
includeFetch: this.data.allowFetch ?? false,
includeRequire: this.data.allowRequire ?? false,
includeRivet: this.data.allowRivet ?? false,
includeInternalProcessContext: this.data.allowInternalProcessContext ?? false,
includeProcess: this.data.allowProcess ?? false,
includeConsole: this.data.allowConsole ?? false,
},
context,
context.graphInputNodeValues,
context.contextValues
);
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/utils/serialization/serialization_v4.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type SerializedNode = {
variants?: ChartNodeVariant<unknown>[];
disabled?: boolean;
isConditional?: boolean;
isAsync?: boolean;
};

/** x/y/width/zIndex */
Expand Down Expand Up @@ -246,6 +247,7 @@ function toSerializedNode(node: ChartNode, allNodes: ChartNode[], allConnections
variants: (node.variants?.length ?? 0) > 0 ? node.variants : undefined,
disabled: node.disabled ? true : undefined,
isConditional: node.isConditional,
isAsync: node.isAsync ? true : undefined,
};
}

Expand Down Expand Up @@ -284,6 +286,7 @@ function fromSerializedNode(
variants: serializedNode.variants ?? [],
disabled: serializedNode.disabled,
isConditional: serializedNode.isConditional,
isAsync: serializedNode.isAsync,
},
connections,
];
Expand Down
Loading
Loading