Skip to content

Commit

Permalink
feat: caches watermark for Spanner change streams (#33566)
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagotnunes authored Jan 16, 2025
1 parent 5e59465 commit 7c9fb33
Showing 12 changed files with 381 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
@@ -76,6 +77,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
@@ -1594,6 +1596,8 @@ public abstract static class ReadChangeStream
@Deprecated
abstract @Nullable Double getTraceSampleProbability();

abstract @Nullable Duration getWatermarkRefreshRate();

abstract Builder toBuilder();

@AutoValue.Builder
@@ -1617,6 +1621,8 @@ abstract static class Builder {

abstract Builder setTraceSampleProbability(Double probability);

abstract Builder setWatermarkRefreshRate(Duration refreshRate);

abstract ReadChangeStream build();
}

@@ -1703,6 +1709,10 @@ public ReadChangeStream withTraceSampleProbability(Double probability) {
return toBuilder().setTraceSampleProbability(probability).build();
}

public ReadChangeStream withWatermarkRefreshRate(Duration refreshRate) {
return toBuilder().setWatermarkRefreshRate(refreshRate).build();
}

@Override
public PCollection<DataChangeRecord> expand(PBegin input) {
checkArgument(
@@ -1803,10 +1813,15 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
metadataDatabaseDialect);
final ActionFactory actionFactory = new ActionFactory();

final Duration watermarkRefreshRate =
MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);

final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
new DetectNewPartitionsDoFn(
daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
Original file line number Diff line number Diff line change
@@ -20,9 +20,11 @@
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import java.util.Collections;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Duration;

/**
* Single place for defining the constants used in the {@code Spanner.readChangeStreams()}
@@ -75,4 +77,10 @@ public class ChangeStreamsConstants {
.setWatermark(Timestamp.now())
.setCreatedAt(Timestamp.now())
.build();

/**
* The default period for which we will re-compute the watermark of the {@link
* DetectNewPartitionsDoFn} stage.
*/
public static final Duration DEFAULT_WATERMARK_REFRESH_RATE = Duration.standardSeconds(1);
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@

import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator;
@@ -151,12 +152,17 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
public synchronized DetectNewPartitionsAction detectNewPartitionsAction(
PartitionMetadataDao partitionMetadataDao,
PartitionMetadataMapper partitionMetadataMapper,
WatermarkCache watermarkCache,
ChangeStreamMetrics metrics,
Duration resumeDuration) {
if (detectNewPartitionsActionInstance == null) {
detectNewPartitionsActionInstance =
new DetectNewPartitionsAction(
partitionMetadataDao, partitionMetadataMapper, metrics, resumeDuration);
partitionMetadataDao,
partitionMetadataMapper,
watermarkCache,
metrics,
resumeDuration);
}
return detectNewPartitionsActionInstance;
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -50,17 +51,20 @@ public class DetectNewPartitionsAction {

private final PartitionMetadataDao dao;
private final PartitionMetadataMapper mapper;
private final WatermarkCache cache;
private final ChangeStreamMetrics metrics;
private final Duration resumeDuration;

/** Constructs an action class for detecting / scheduling new partitions. */
public DetectNewPartitionsAction(
PartitionMetadataDao dao,
PartitionMetadataMapper mapper,
WatermarkCache cache,
ChangeStreamMetrics metrics,
Duration resumeDuration) {
this.dao = dao;
this.mapper = mapper;
this.cache = cache;
this.metrics = metrics;
this.resumeDuration = resumeDuration;
}
@@ -98,7 +102,7 @@ public ProcessContinuation run(

final Timestamp readTimestamp = tracker.currentRestriction().getFrom();
// Updates the current watermark as the min of the watermarks from all existing partitions
final Timestamp minWatermark = dao.getUnfinishedMinWatermark();
final Timestamp minWatermark = cache.getUnfinishedMinWatermark();

if (minWatermark != null) {
return processPartitions(tracker, receiver, watermarkEstimator, minWatermark, readTimestamp);
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import com.google.cloud.Timestamp;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;

/**
* Asynchronously compute the earliest partition watermark and stores it in memory. The value will
* be recomputed periodically, as configured by the refresh rate.
*
* <p>On every period, we will call {@link PartitionMetadataDao#getUnfinishedMinWatermark()} to
* refresh the value.
*/
public class AsyncWatermarkCache implements WatermarkCache {

private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d";
private static final Object MIN_WATERMARK_KEY = new Object();
private final LoadingCache<Object, Optional<Timestamp>> cache;

public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
this.cache =
CacheBuilder.newBuilder()
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
.build(
CacheLoader.asyncReloading(
CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())),
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build())));
}

@Override
public @Nullable Timestamp getUnfinishedMinWatermark() {
try {
return cache.get(MIN_WATERMARK_KEY).orElse(null);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;

public class CacheFactory implements Serializable {

private static final long serialVersionUID = -8722905670370252723L;
private static final Map<Long, WatermarkCache> WATERMARK_CACHE = new ConcurrentHashMap<>();
private static final AtomicLong CACHE_ID = new AtomicLong();

// The unique id for the cache of this CacheFactory. This guarantees that if the CacheFactory is
// serialized / deserialized it will get the same instance of the factory.
private final long cacheId = CACHE_ID.getAndIncrement();
private final DaoFactory daoFactory;
private final Duration refreshRate;

public CacheFactory(DaoFactory daoFactory, Duration watermarkRefreshRate) {
this.daoFactory = daoFactory;
this.refreshRate = watermarkRefreshRate;
}

public WatermarkCache getWatermarkCache() {
return WATERMARK_CACHE.computeIfAbsent(
cacheId,
key ->
refreshRate.getMillis() == 0
? new NoOpWatermarkCache(daoFactory.getPartitionMetadataDao())
: new AsyncWatermarkCache(daoFactory.getPartitionMetadataDao(), refreshRate));
}

@VisibleForTesting
long getCacheId() {
return cacheId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import com.google.cloud.Timestamp;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;

/**
* Synchronously compute the earliest partition watermark, by delegating the call to {@link
* PartitionMetadataDao#getUnfinishedMinWatermark()}.
*/
public class NoOpWatermarkCache implements WatermarkCache {

private final PartitionMetadataDao dao;

public NoOpWatermarkCache(PartitionMetadataDao dao) {
this.dao = dao;
}

@Override
public @Nullable Timestamp getUnfinishedMinWatermark() {
return dao.getUnfinishedMinWatermark();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import com.google.cloud.Timestamp;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;

@FunctionalInterface
public interface WatermarkCache {

/**
* Fetches the earliest partition watermark from the partition metadata table that is not in a
* {@link State#FINISHED} state.
*
* @return the earliest partition watermark which is not in a {@link State#FINISHED} state.
*/
@Nullable
Timestamp getUnfinishedMinWatermark();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Caching strategy for watermark. */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
Loading

0 comments on commit 7c9fb33

Please sign in to comment.