From 602605952a46756fbe7c0791fda5457f6b52099a Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Mon, 15 Sep 2025 15:42:00 -0700 Subject: [PATCH] merge raw history removal Signed-off-by: Shijie Sheng --- .../internal/common/InternalUtils.java | 97 ---------------- .../shadowing/ReplayWorkflowActivityImpl.java | 13 +-- .../testservice/TestWorkflowStoreImpl.java | 8 +- .../WorkflowServiceTChannel.java | 13 +-- .../internal/common/InternalUtilsTest.java | 106 ------------------ .../shadowing/ReplayWorkflowActivityTest.java | 20 ++-- 6 files changed, 20 insertions(+), 237 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/common/InternalUtils.java b/src/main/java/com/uber/cadence/internal/common/InternalUtils.java index 520d8efbb..e5b4b330b 100644 --- a/src/main/java/com/uber/cadence/internal/common/InternalUtils.java +++ b/src/main/java/com/uber/cadence/internal/common/InternalUtils.java @@ -18,11 +18,6 @@ package com.uber.cadence.internal.common; import com.google.common.base.Defaults; -import com.google.common.collect.Lists; -import com.uber.cadence.DataBlob; -import com.uber.cadence.History; -import com.uber.cadence.HistoryEvent; -import com.uber.cadence.HistoryEventFilterType; import com.uber.cadence.Memo; import com.uber.cadence.SearchAttributes; import com.uber.cadence.TaskList; @@ -33,15 +28,10 @@ import com.uber.cadence.workflow.WorkflowMethod; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; /** Utility functions shared by the implementation code. */ public final class InternalUtils { @@ -164,93 +154,6 @@ public static SearchAttributes convertMapToSearchAttributes( return new SearchAttributes().setIndexedFields(mapOfByteBuffer); } - // This method serializes history to blob data - public static DataBlob SerializeFromHistoryToBlobData(History history) { - - // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 - TSerializer serializer = new TSerializer(); - DataBlob blob = new DataBlob(); - try { - blob.setData(serializer.serialize(history)); - } catch (org.apache.thrift.TException err) { - throw new RuntimeException("Serialize history to blob data failed", err); - } - - return blob; - } - - // This method deserialize the DataBlob data to the History data - public static History DeserializeFromBlobDataToHistory( - List blobData, HistoryEventFilterType historyEventFilterType) throws TException { - - // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 - TDeserializer deSerializer = new TDeserializer(); - List events = Lists.newArrayList(); - for (DataBlob data : blobData) { - History history = new History(); - try { - byte[] dataByte = data.getData(); - // TODO: verify the beginning index - dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length); - deSerializer.deserialize(history, dataByte); - - if (history == null || history.getEvents() == null || history.getEvents().size() == 0) { - return null; - } - } catch (org.apache.thrift.TException err) { - throw new TException("Deserialize blob data to history failed with unknown error"); - } - - events.addAll(history.getEvents()); - } - - if (events.size() > 0 && historyEventFilterType == HistoryEventFilterType.CLOSE_EVENT) { - events = events.subList(events.size() - 1, events.size()); - } - - return new History().setEvents(events); - } - - // This method serializes history event to blob data - public static List SerializeFromHistoryEventToBlobData(List events) { - - // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 - TSerializer serializer = new TSerializer(); - List blobs = Lists.newArrayListWithCapacity(events.size()); - for (HistoryEvent event : events) { - DataBlob blob = new DataBlob(); - try { - blob.setData(serializer.serialize(event)); - } catch (org.apache.thrift.TException err) { - throw new RuntimeException("Serialize history event to blob data failed", err); - } - blobs.add(blob); - } - return blobs; - } - - // This method serializes blob data to history event - public static List DeserializeFromBlobDataToHistoryEvents(List blobData) - throws TException { - - // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 - TDeserializer deSerializer = new TDeserializer(); - List events = Lists.newArrayList(); - for (DataBlob data : blobData) { - try { - HistoryEvent event = new HistoryEvent(); - byte[] dataByte = data.getData(); - // TODO: verify the beginning index - dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length); - deSerializer.deserialize(event, dataByte); - events.add(event); - } catch (org.apache.thrift.TException err) { - throw new TException("Deserialize blob data to history event failed with unknown error"); - } - } - return events; - } - /** Prohibit instantiation */ private InternalUtils() {} } diff --git a/src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityImpl.java b/src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityImpl.java index d2432889f..eae82edb0 100644 --- a/src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityImpl.java +++ b/src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityImpl.java @@ -19,12 +19,9 @@ import com.google.common.collect.Lists; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; -import com.uber.cadence.History; import com.uber.cadence.HistoryEvent; -import com.uber.cadence.HistoryEventFilterType; import com.uber.cadence.activity.Activity; import com.uber.cadence.common.WorkflowExecutionHistory; -import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.metrics.MetricsType; @@ -185,14 +182,10 @@ protected WorkflowExecutionHistory getFullHistory(String domain, WorkflowExecuti nextPageToken, this.serviceClient, domain, execution.toThrift())); pageToken = resp.getNextPageToken(); - // handle raw history + // TODO support raw history feature once server removes default Thrift encoding if (resp.getRawHistory() != null && resp.getRawHistory().size() > 0) { - History history = - InternalUtils.DeserializeFromBlobDataToHistory( - resp.getRawHistory(), HistoryEventFilterType.ALL_EVENT); - if (history != null && history.getEvents() != null) { - histories.addAll(history.getEvents()); - } + throw new UnsupportedOperationException( + "Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover"); } else { histories.addAll(resp.getHistory().getEvents()); } diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java index 5759f4562..06f18eab2 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java @@ -18,7 +18,6 @@ package com.uber.cadence.internal.testservice; import com.uber.cadence.BadRequestError; -import com.uber.cadence.DataBlob; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.EventType; import com.uber.cadence.GetWorkflowExecutionHistoryRequest; @@ -34,7 +33,6 @@ import com.uber.cadence.StickyExecutionAttributes; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.WorkflowExecutionInfo; -import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.testservice.RequestContext.Timer; import java.time.Duration; @@ -348,12 +346,10 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory( if (!getRequest.isWaitForNewEvent() && getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) { List events = history.getEventsLocked(); - List blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events); // Copy the list as it is mutable. Individual events assumed immutable. ArrayList eventsCopy = new ArrayList<>(events); return new GetWorkflowExecutionHistoryResponse() - .setHistory(new History().setEvents(eventsCopy)) - .setRawHistory(blobs); + .setHistory(new History().setEvents(eventsCopy)); } expectedNextEventId = history.getNextEventIdLocked(); } finally { @@ -361,11 +357,9 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory( } List events = history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType()); - List blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events); GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse(); if (events != null) { result.setHistory(new History().setEvents(events)); - result.setRawHistory(blobs); } return result; } diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index d8d22695f..ba4b520ca 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -28,7 +28,6 @@ import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result; import com.uber.cadence.internal.Version; import com.uber.cadence.internal.common.CheckedExceptionWrapper; -import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.metrics.ServiceMethod; @@ -774,10 +773,8 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory( if (response.getResponseCode() == ResponseCode.OK) { GetWorkflowExecutionHistoryResponse res = result.getSuccess(); if (res.getRawHistory() != null) { - History history = - InternalUtils.DeserializeFromBlobDataToHistory( - res.getRawHistory(), getRequest.getHistoryEventFilterType()); - res.setHistory(history); + throw new TException( + "Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover"); } return res; } @@ -2601,10 +2598,8 @@ private void getWorkflowExecutionHistory( if (r.getResponseCode() == ResponseCode.OK) { GetWorkflowExecutionHistoryResponse res = result.getSuccess(); if (res.getRawHistory() != null) { - History history = - InternalUtils.DeserializeFromBlobDataToHistory( - res.getRawHistory(), getRequest.getHistoryEventFilterType()); - res.setHistory(history); + throw new TException( + "Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover"); } resultHandler.onComplete(res); return; diff --git a/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java b/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java index 23ecb652c..eb6250adf 100644 --- a/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java +++ b/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java @@ -17,23 +17,14 @@ package com.uber.cadence.internal.common; -import static com.uber.cadence.EventType.WorkflowExecutionStarted; import static junit.framework.TestCase.assertEquals; -import static org.junit.Assert.assertNotNull; -import com.google.common.collect.Lists; -import com.googlecode.junittoolbox.MultithreadingTester; -import com.googlecode.junittoolbox.RunnableAssert; import com.uber.cadence.*; import com.uber.cadence.converter.DataConverterException; import com.uber.cadence.workflow.WorkflowUtils; import java.io.FileOutputStream; -import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.HashMap; -import java.util.List; import java.util.Map; -import junit.framework.TestCase; import org.junit.Test; public class InternalUtilsTest { @@ -56,101 +47,4 @@ public void testConvertMapToSearchAttributesException() throws Throwable { attr.put("InvalidValue", new FileOutputStream("dummy")); InternalUtils.convertMapToSearchAttributes(attr); } - - @Test - public void testSerialization_History() { - - RunnableAssert r = - new RunnableAssert("history_serialization") { - @Override - public void run() { - HistoryEvent event = - new HistoryEvent() - .setEventId(1) - .setVersion(1) - .setEventType(WorkflowExecutionStarted) - .setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)) - .setWorkflowExecutionStartedEventAttributes( - new WorkflowExecutionStartedEventAttributes() - .setAttempt(1) - .setFirstExecutionRunId("test")); - - List historyEvents = Lists.newArrayList(event); - History history = new History().setEvents(historyEvents); - DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history); - assertNotNull(blob); - - try { - History result = - InternalUtils.DeserializeFromBlobDataToHistory( - Lists.newArrayList(blob), HistoryEventFilterType.ALL_EVENT); - assertNotNull(result); - assertEquals(1, result.events.size()); - assertEquals(event.getEventId(), result.events.get(0).getEventId()); - assertEquals(event.getVersion(), result.events.get(0).getVersion()); - assertEquals(event.getEventType(), result.events.get(0).getEventType()); - assertEquals(event.getTimestamp(), result.events.get(0).getTimestamp()); - assertEquals( - event.getWorkflowExecutionStartedEventAttributes(), - result.events.get(0).getWorkflowExecutionStartedEventAttributes()); - } catch (Exception e) { - TestCase.fail("Received unexpected error during deserialization"); - } - } - }; - - try { - new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run(); - } catch (Exception e) { - TestCase.fail("Received unexpected error during concurrent deserialization"); - } - } - - @Test - public void testSerialization_HistoryEvent() { - - RunnableAssert r = - new RunnableAssert("history_event_serialization") { - @Override - public void run() { - HistoryEvent event = - new HistoryEvent() - .setEventId(1) - .setVersion(1) - .setEventType(WorkflowExecutionStarted) - .setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)) - .setWorkflowExecutionStartedEventAttributes( - new WorkflowExecutionStartedEventAttributes() - .setAttempt(1) - .setFirstExecutionRunId("test")); - - List historyEvents = Lists.newArrayList(event); - List blobList = - InternalUtils.SerializeFromHistoryEventToBlobData(historyEvents); - assertEquals(1, blobList.size()); - - try { - List result = - InternalUtils.DeserializeFromBlobDataToHistoryEvents(blobList); - assertNotNull(result); - assertEquals(1, result.size()); - assertEquals(event.getEventId(), result.get(0).getEventId()); - assertEquals(event.getVersion(), result.get(0).getVersion()); - assertEquals(event.getEventType(), result.get(0).getEventType()); - assertEquals(event.getTimestamp(), result.get(0).getTimestamp()); - assertEquals( - event.getWorkflowExecutionStartedEventAttributes(), - result.get(0).getWorkflowExecutionStartedEventAttributes()); - } catch (Exception e) { - TestCase.fail("Received unexpected error during deserialization"); - } - } - }; - - try { - new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run(); - } catch (Exception e) { - TestCase.fail("Received unexpected error during concurrent deserialization"); - } - } } diff --git a/src/test/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityTest.java b/src/test/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityTest.java index 4e1df82c3..ed17dc8fd 100644 --- a/src/test/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityTest.java +++ b/src/test/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityTest.java @@ -20,9 +20,7 @@ import static com.uber.cadence.EventType.DecisionTaskStarted; import static com.uber.cadence.EventType.TimerStarted; import static com.uber.cadence.EventType.WorkflowExecutionStarted; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -42,7 +40,6 @@ import com.uber.cadence.WorkflowType; import com.uber.cadence.common.WorkflowExecutionHistory; import com.uber.cadence.converter.JsonDataConverter; -import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.testing.WorkflowTestingTest; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.testing.TestActivityEnvironment; @@ -144,15 +141,22 @@ public void testGetFullHistory_DecodedHistory_ExpectedSuccessResponse() throws E } @Test - public void testGetFullHistory_RawHistory_ExpectedSuccessResponse() throws Exception { + public void testGetFullHistory_RawHistory_NotSupportedError() throws Exception { History history = new History().setEvents(Lists.newArrayList(historyEvents.get(0))); - DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history); + DataBlob blob = new DataBlob().setData(new byte[] {1, 2, 3}); GetWorkflowExecutionHistoryResponse response = new GetWorkflowExecutionHistoryResponse().setRawHistory(Lists.newArrayList(blob)); when(mockServiceClient.GetWorkflowExecutionHistory(any())).thenReturn(response); - WorkflowExecutionHistory result = activity.getFullHistory(domain, execution); - assertEquals(1, result.getEvents().size()); + try { + WorkflowExecutionHistory result = activity.getFullHistory(domain, execution); + } catch (Exception e) { + assertEquals( + "Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover", + e.getMessage()); + return; + } + fail("Expected exception not thrown"); } @Test(expected = Error.class)