From 7c9fb33baaeee3291754e400d2edb2421a147632 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Thu, 16 Jan 2025 20:06:10 +1100 Subject: [PATCH] feat: caches watermark for Spanner change streams (#33566) --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 17 ++- .../changestreams/ChangeStreamsConstants.java | 8 ++ .../changestreams/action/ActionFactory.java | 8 +- .../action/DetectNewPartitionsAction.java | 6 +- .../cache/AsyncWatermarkCache.java | 64 ++++++++++ .../changestreams/cache/CacheFactory.java | 58 +++++++++ .../cache/NoOpWatermarkCache.java | 40 ++++++ .../changestreams/cache/WatermarkCache.java | 35 ++++++ .../changestreams/cache/package-info.java | 20 +++ .../dofn/DetectNewPartitionsDoFn.java | 8 +- .../action/DetectNewPartitionsActionTest.java | 11 +- .../changestreams/cache/CacheFactoryTest.java | 114 ++++++++++++++++++ 12 files changed, 381 insertions(+), 8 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/WatermarkCache.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/package-info.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactoryTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index a6cf7ebb12a5..17cfdce079cf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; @@ -76,6 +77,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants; import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn; @@ -1594,6 +1596,8 @@ public abstract static class ReadChangeStream @Deprecated abstract @Nullable Double getTraceSampleProbability(); + abstract @Nullable Duration getWatermarkRefreshRate(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1617,6 +1621,8 @@ abstract static class Builder { abstract Builder setTraceSampleProbability(Double probability); + abstract Builder setWatermarkRefreshRate(Duration refreshRate); + abstract ReadChangeStream build(); } @@ -1703,6 +1709,10 @@ public ReadChangeStream withTraceSampleProbability(Double probability) { return toBuilder().setTraceSampleProbability(probability).build(); } + public ReadChangeStream withWatermarkRefreshRate(Duration refreshRate) { + return toBuilder().setWatermarkRefreshRate(refreshRate).build(); + } + @Override public PCollection expand(PBegin input) { checkArgument( @@ -1803,10 +1813,15 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta metadataDatabaseDialect); final ActionFactory actionFactory = new ActionFactory(); + final Duration watermarkRefreshRate = + MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE); + final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate); + final InitializeDoFn initializeDoFn = new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp); final DetectNewPartitionsDoFn detectNewPartitionsDoFn = - new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics); + new DetectNewPartitionsDoFn( + daoFactory, mapperFactory, actionFactory, cacheFactory, metrics); final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); final PostProcessingMetricsDoFn postProcessingMetricsDoFn = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java index ba8dfc8a1727..db09adb0f27e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java @@ -20,9 +20,11 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.RpcPriority; import java.util.Collections; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.joda.time.Duration; /** * Single place for defining the constants used in the {@code Spanner.readChangeStreams()} @@ -75,4 +77,10 @@ public class ChangeStreamsConstants { .setWatermark(Timestamp.now()) .setCreatedAt(Timestamp.now()) .build(); + + /** + * The default period for which we will re-compute the watermark of the {@link + * DetectNewPartitionsDoFn} stage. + */ + public static final Duration DEFAULT_WATERMARK_REFRESH_RATE = Duration.standardSeconds(1); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java index 135d4fa4e154..010620ec7ea9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java @@ -19,6 +19,7 @@ import java.io.Serializable; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator; @@ -151,12 +152,17 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( public synchronized DetectNewPartitionsAction detectNewPartitionsAction( PartitionMetadataDao partitionMetadataDao, PartitionMetadataMapper partitionMetadataMapper, + WatermarkCache watermarkCache, ChangeStreamMetrics metrics, Duration resumeDuration) { if (detectNewPartitionsActionInstance == null) { detectNewPartitionsActionInstance = new DetectNewPartitionsAction( - partitionMetadataDao, partitionMetadataMapper, metrics, resumeDuration); + partitionMetadataDao, + partitionMetadataMapper, + watermarkCache, + metrics, + resumeDuration); } return detectNewPartitionsActionInstance; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java index 73967d2a2a75..40160de7b958 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java @@ -25,6 +25,7 @@ import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; @@ -50,6 +51,7 @@ public class DetectNewPartitionsAction { private final PartitionMetadataDao dao; private final PartitionMetadataMapper mapper; + private final WatermarkCache cache; private final ChangeStreamMetrics metrics; private final Duration resumeDuration; @@ -57,10 +59,12 @@ public class DetectNewPartitionsAction { public DetectNewPartitionsAction( PartitionMetadataDao dao, PartitionMetadataMapper mapper, + WatermarkCache cache, ChangeStreamMetrics metrics, Duration resumeDuration) { this.dao = dao; this.mapper = mapper; + this.cache = cache; this.metrics = metrics; this.resumeDuration = resumeDuration; } @@ -98,7 +102,7 @@ public ProcessContinuation run( final Timestamp readTimestamp = tracker.currentRestriction().getFrom(); // Updates the current watermark as the min of the watermarks from all existing partitions - final Timestamp minWatermark = dao.getUnfinishedMinWatermark(); + final Timestamp minWatermark = cache.getUnfinishedMinWatermark(); if (minWatermark != null) { return processPartitions(tracker, receiver, watermarkEstimator, minWatermark, readTimestamp); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java new file mode 100644 index 000000000000..827397fe6fc8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache; + +import com.google.cloud.Timestamp; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.joda.time.Duration; + +/** + * Asynchronously compute the earliest partition watermark and stores it in memory. The value will + * be recomputed periodically, as configured by the refresh rate. + * + *

On every period, we will call {@link PartitionMetadataDao#getUnfinishedMinWatermark()} to + * refresh the value. + */ +public class AsyncWatermarkCache implements WatermarkCache { + + private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d"; + private static final Object MIN_WATERMARK_KEY = new Object(); + private final LoadingCache> cache; + + public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) { + this.cache = + CacheBuilder.newBuilder() + .refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis())) + .build( + CacheLoader.asyncReloading( + CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())), + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build()))); + } + + @Override + public @Nullable Timestamp getUnfinishedMinWatermark() { + try { + return cache.get(MIN_WATERMARK_KEY).orElse(null); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java new file mode 100644 index 000000000000..de5ddbbf3a97 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.joda.time.Duration; + +public class CacheFactory implements Serializable { + + private static final long serialVersionUID = -8722905670370252723L; + private static final Map WATERMARK_CACHE = new ConcurrentHashMap<>(); + private static final AtomicLong CACHE_ID = new AtomicLong(); + + // The unique id for the cache of this CacheFactory. This guarantees that if the CacheFactory is + // serialized / deserialized it will get the same instance of the factory. + private final long cacheId = CACHE_ID.getAndIncrement(); + private final DaoFactory daoFactory; + private final Duration refreshRate; + + public CacheFactory(DaoFactory daoFactory, Duration watermarkRefreshRate) { + this.daoFactory = daoFactory; + this.refreshRate = watermarkRefreshRate; + } + + public WatermarkCache getWatermarkCache() { + return WATERMARK_CACHE.computeIfAbsent( + cacheId, + key -> + refreshRate.getMillis() == 0 + ? new NoOpWatermarkCache(daoFactory.getPartitionMetadataDao()) + : new AsyncWatermarkCache(daoFactory.getPartitionMetadataDao(), refreshRate)); + } + + @VisibleForTesting + long getCacheId() { + return cacheId; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java new file mode 100644 index 000000000000..17275ae89834 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache; + +import com.google.cloud.Timestamp; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; + +/** + * Synchronously compute the earliest partition watermark, by delegating the call to {@link + * PartitionMetadataDao#getUnfinishedMinWatermark()}. + */ +public class NoOpWatermarkCache implements WatermarkCache { + + private final PartitionMetadataDao dao; + + public NoOpWatermarkCache(PartitionMetadataDao dao) { + this.dao = dao; + } + + @Override + public @Nullable Timestamp getUnfinishedMinWatermark() { + return dao.getUnfinishedMinWatermark(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/WatermarkCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/WatermarkCache.java new file mode 100644 index 000000000000..8801a85be0bd --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/WatermarkCache.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache; + +import com.google.cloud.Timestamp; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; + +@FunctionalInterface +public interface WatermarkCache { + + /** + * Fetches the earliest partition watermark from the partition metadata table that is not in a + * {@link State#FINISHED} state. + * + * @return the earliest partition watermark which is not in a {@link State#FINISHED} state. + */ + @Nullable + Timestamp getUnfinishedMinWatermark(); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/package-info.java new file mode 100644 index 000000000000..bf4507c3bfe2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Caching strategy for watermark. */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java index 1cf337f45b95..841ab61e0f5d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java @@ -20,6 +20,8 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory; @@ -60,6 +62,7 @@ public class DetectNewPartitionsDoFn extends DoFn tracker; @@ -60,6 +62,7 @@ public class DetectNewPartitionsActionTest { public void setUp() throws Exception { dao = mock(PartitionMetadataDao.class); mapper = mock(PartitionMetadataMapper.class); + cache = mock(WatermarkCache.class); metrics = mock(ChangeStreamMetrics.class); resumeDuration = Duration.standardSeconds(1); tracker = mock(RestrictionTracker.class); @@ -67,7 +70,7 @@ public void setUp() throws Exception { receiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); - action = new DetectNewPartitionsAction(dao, mapper, metrics, resumeDuration); + action = new DetectNewPartitionsAction(dao, mapper, cache, metrics, resumeDuration); when(tracker.currentRestriction()).thenReturn(restriction); } @@ -87,7 +90,7 @@ public void testSchedulesPartitionsAndResumesWhenPartitionsWereCreated() { when(partition2.getPartitionToken()).thenReturn("token2"); when(partition2.getCreatedAt()).thenReturn(partitionCreatedAt); when(restriction.getFrom()).thenReturn(from); - when(dao.getUnfinishedMinWatermark()).thenReturn(minWatermark); + when(cache.getUnfinishedMinWatermark()).thenReturn(minWatermark); when(dao.getAllPartitionsCreatedAfter(from)).thenReturn(resultSet); when(dao.updateToScheduled(Arrays.asList("token1", "token2"))).thenReturn(partitionScheduledAt); when(resultSet.next()).thenReturn(true, true, false); @@ -108,7 +111,7 @@ public void testDoesNothingWhenNoPartitionsWereCreated() { final Instant minWatermarkInstant = new Instant(minWatermark.toSqlTimestamp()); final ResultSet resultSet = mock(ResultSet.class); when(restriction.getFrom()).thenReturn(from); - when(dao.getUnfinishedMinWatermark()).thenReturn(minWatermark); + when(cache.getUnfinishedMinWatermark()).thenReturn(minWatermark); when(dao.getAllPartitionsCreatedAfter(from)).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); @@ -123,7 +126,7 @@ public void testDoesNothingWhenNoPartitionsWereCreated() { public void testTerminatesWhenAllPartitionsAreFinished() { final Timestamp from = Timestamp.ofTimeMicroseconds(10L); when(restriction.getFrom()).thenReturn(from); - when(dao.getUnfinishedMinWatermark()).thenReturn(null); + when(cache.getUnfinishedMinWatermark()).thenReturn(null); final ProcessContinuation continuation = action.run(tracker, receiver, watermarkEstimator); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactoryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactoryTest.java new file mode 100644 index 000000000000..267c7c6d6b4f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactoryTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.withSettings; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.Test; + +public class CacheFactoryTest { + + private DaoFactory daoFactory; + + @Before + public void setUp() throws Exception { + daoFactory = mock(DaoFactory.class, withSettings().serializable()); + } + + @Test + public void testReturnsNoOpWatermarkCacheWhenDurationIsZero() { + CacheFactory cacheFactory = new CacheFactory(daoFactory, Duration.ZERO); + + assertEquals(NoOpWatermarkCache.class, cacheFactory.getWatermarkCache().getClass()); + } + + @Test + public void testReturnsAsyncWatermarkCacheWhenDurationIsNonZero() { + CacheFactory cacheFactory = new CacheFactory(daoFactory, Duration.standardSeconds(1)); + + assertEquals(AsyncWatermarkCache.class, cacheFactory.getWatermarkCache().getClass()); + } + + @Test + public void testSerializeDeserialize() throws Exception { + CacheFactory cacheFactory = new CacheFactory(daoFactory, Duration.standardSeconds(5)); + long cacheId = cacheFactory.getCacheId(); + WatermarkCache watermarkCache = cacheFactory.getWatermarkCache(); + + CacheFactory deserializedCacheFactory = deserialize(serialize(cacheFactory)); + + assertEquals(cacheId, deserializedCacheFactory.getCacheId()); + assertEquals(watermarkCache, deserializedCacheFactory.getWatermarkCache()); + } + + @Test + public void testMultipleDeserializations() throws Exception { + CacheFactory cacheFactory = new CacheFactory(daoFactory, Duration.standardSeconds(5)); + long cacheId = cacheFactory.getCacheId(); + WatermarkCache watermarkCache = cacheFactory.getWatermarkCache(); + byte[] serializedCacheFactory = serialize(cacheFactory); + + for (int i = 0; i < 10; i++) { + CacheFactory deserializedCacheFactory = deserialize(serializedCacheFactory); + + assertEquals(cacheId, deserializedCacheFactory.getCacheId()); + assertEquals(watermarkCache, deserializedCacheFactory.getWatermarkCache()); + } + } + + @Test + public void testMultipleInstancesHaveDifferentCacheIds() throws Exception { + CacheFactory cacheFactory1 = new CacheFactory(daoFactory, Duration.standardSeconds(5)); + long cacheId1 = cacheFactory1.getCacheId(); + CacheFactory cacheFactory2 = new CacheFactory(daoFactory, Duration.standardSeconds(5)); + long cacheId2 = cacheFactory2.getCacheId(); + + CacheFactory deserializedCacheFactory1 = deserialize(serialize(cacheFactory1)); + CacheFactory deserializedCacheFactory2 = deserialize(serialize(cacheFactory2)); + + assertNotEquals(cacheId1, cacheId2); + assertEquals(cacheId1, deserializedCacheFactory1.getCacheId()); + assertEquals(cacheId2, deserializedCacheFactory2.getCacheId()); + } + + private byte[] serialize(CacheFactory cacheFactory) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(cacheFactory); + return baos.toByteArray(); + } + } + + private CacheFactory deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais)) { + return (CacheFactory) ois.readObject(); + } + } +}