Skip to content

Add support for history streaming #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,37 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI
private PurgeResult toPurgeResult(PurgeInstancesResponse response){
return new PurgeResult(response.getDeletedInstanceCount());
}

/**
* Streams the history events for an orchestration instance.
*
* @param instanceId The ID of the orchestration instance.
* @param executionId Optional execution ID of the orchestration instance.
* @param forWorkItemProcessing Whether the history is being streamed for work item processing.
* @return An iterator of HistoryChunk objects containing the orchestration history.
*/
public Iterator<HistoryChunk> streamInstanceHistory(String instanceId, String executionId, boolean forWorkItemProcessing) {
Helpers.throwIfArgumentNull(instanceId, "instanceId");

StreamInstanceHistoryRequest.Builder requestBuilder = StreamInstanceHistoryRequest.newBuilder()
.setInstanceId(instanceId)
.setForWorkItemProcessing(forWorkItemProcessing);

if (executionId != null && !executionId.isEmpty()) {
requestBuilder.setExecutionId(StringValue.of(executionId));
}

return this.sidecarClient.streamInstanceHistory(requestBuilder.build());
}

/**
* Streams the history events for an orchestration instance.
*
* @param instanceId The ID of the orchestration instance.
* @param forWorkItemProcessing Whether the history is being streamed for work item processing.
* @return An iterator of HistoryChunk objects containing the orchestration history.
*/
public Iterator<HistoryChunk> streamInstanceHistory(String instanceId, boolean forWorkItemProcessing) {
return streamInstanceHistory(instanceId, null, forWorkItemProcessing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkerCapability;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;

Expand Down Expand Up @@ -122,7 +123,9 @@ public void startAndBlock() {
// TODO: How do we interrupt manually?
while (true) {
try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder()
.addCapabilities(WorkerCapability.WORKER_CAPABILITY_HISTORY_STREAMING)
.build();
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
WorkItem workItem = workItemStream.next();
Expand All @@ -132,9 +135,19 @@ public void startAndBlock() {

// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
// TODO: Error handling
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());
TaskOrchestratorResult taskOrchestratorResult;

if (orchestratorRequest.getRequiresHistoryStreaming()) {
// Stream the history events when requested by the orchestrator service
taskOrchestratorResult = processOrchestrationWithStreamingHistory(
taskOrchestrationExecutor,
orchestratorRequest);
} else {
// Standard non-streaming execution path
taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());
}

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
Expand Down Expand Up @@ -210,4 +223,73 @@ else if (requestType == RequestCase.HEALTHPING)
public void stop() {
this.close();
}

/**
* Process an orchestration request using streaming history instead of receiving the full history in the work item.
* This is used when the history is too large to fit in a single gRPC message.
*
* @param taskOrchestrationExecutor the executor to use for processing the orchestration
* @param orchestratorRequest the request containing orchestration details
* @return the result of executing the orchestration
*/
private TaskOrchestratorResult processOrchestrationWithStreamingHistory(
TaskOrchestrationExecutor taskOrchestrationExecutor,
OrchestratorRequest orchestratorRequest) {

logger.fine(() -> String.format(
"Streaming history for instance '%s' as it requires history streaming",
orchestratorRequest.getInstanceId()));

// Create a request to stream the instance history
StreamInstanceHistoryRequest.Builder requestBuilder = StreamInstanceHistoryRequest.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.setForWorkItemProcessing(true);

// Include execution ID if present
if (orchestratorRequest.hasExecutionId()) {
requestBuilder.setExecutionId(orchestratorRequest.getExecutionId());
}

StreamInstanceHistoryRequest request = requestBuilder.build();

// Stream history from the service
List<HistoryEvent> pastEvents = new ArrayList<>();
List<HistoryEvent> newEvents = new ArrayList<>();

try {
// Get a stream of history chunks
Iterator<HistoryChunk> historyStream = this.sidecarClient.streamInstanceHistory(request);

// Process each chunk of history events
while (historyStream.hasNext()) {
HistoryChunk chunk = historyStream.next();

// The first chunk is considered the "past events", and the rest are "new events"
if (pastEvents.isEmpty()) {
pastEvents.addAll(chunk.getEventsList());
} else {
newEvents.addAll(chunk.getEventsList());
}
}

logger.fine(() -> String.format(
"Successfully streamed history for instance '%s': %d past events, %d new events",
orchestratorRequest.getInstanceId(), pastEvents.size(), newEvents.size()));

// Execute the orchestration with the collected history events
return taskOrchestrationExecutor.execute(pastEvents, newEvents);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING, "The sidecar service is unavailable while streaming history for instance " +
orchestratorRequest.getInstanceId());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.WARNING, "History streaming was canceled for instance " +
orchestratorRequest.getInstanceId());
} else {
logger.log(Level.WARNING, "Error streaming history for instance " +
orchestratorRequest.getInstanceId(), e);
}
throw e;
}
}
}