diff --git a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/RunningSnapshotIT.java b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/RunningSnapshotIT.java new file mode 100644 index 0000000000000..cbcc7784bd947 --- /dev/null +++ b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/RunningSnapshotIT.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.upgrades; + +import com.carrotsearch.randomizedtesting.annotations.Name; + +import org.elasticsearch.client.Request; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.rest.ObjectPath; + +import java.io.IOException; +import java.util.Collection; + +import static org.elasticsearch.upgrades.SnapshotBasedRecoveryIT.indexDocs; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class RunningSnapshotIT extends AbstractRollingUpgradeTestCase { + + public RunningSnapshotIT(@Name("upgradedNodes") int upgradedNodes) { + super(upgradedNodes); + } + + public void testRunningSnapshotCompleteAfterUpgrade() throws Exception { + final String indexName = "index"; + final String repositoryName = "repo"; + final String snapshotName = "snapshot"; + final var nodeIds = getNodesInfo(client()).keySet(); + + if (isOldCluster()) { + registerRepository(repositoryName, "fs", randomBoolean(), Settings.builder().put("location", "backup").build()); + // create an index to have one shard per node + createIndex(indexName, indexSettings(3, 0).put("index.routing.allocation.total_shards_per_node", 1).build()); + ensureGreen(indexName); + if (randomBoolean()) { + indexDocs(indexName, between(10, 50)); + } + flush(indexName, true); + // Signal shutdown to prevent snapshot from being completed + putShutdownMetadata(nodeIds); + createSnapshot(repositoryName, snapshotName, false); + assertRunningSnapshot(repositoryName, snapshotName); + } else { + if (isUpgradedCluster()) { + deleteShutdownMetadata(nodeIds); + assertNoShutdownMetadata(nodeIds); + ensureGreen(indexName); + assertBusy(() -> assertCompletedSnapshot(repositoryName, snapshotName)); + } else { + assertRunningSnapshot(repositoryName, snapshotName); + } + } + } + + private void putShutdownMetadata(Collection nodeIds) throws IOException { + for (String nodeId : nodeIds) { + final Request putShutdownRequest = new Request("PUT", "/_nodes/" + nodeId + "/shutdown"); + putShutdownRequest.setJsonEntity(""" + { + "type": "remove", + "reason": "test" + }"""); + client().performRequest(putShutdownRequest); + } + } + + private void deleteShutdownMetadata(Collection nodeIds) throws IOException { + for (String nodeId : nodeIds) { + final Request request = new Request("DELETE", "/_nodes/" + nodeId + "/shutdown"); + client().performRequest(request); + } + } + + private void assertNoShutdownMetadata(Collection nodeIds) throws IOException { + final ObjectPath responsePath = assertOKAndCreateObjectPath( + client().performRequest(new Request("GET", "/_nodes/" + Strings.collectionToCommaDelimitedString(nodeIds) + "/shutdown")) + ); + assertThat(responsePath.evaluate("nodes"), empty()); + } + + private void assertRunningSnapshot(String repositoryName, String snapshotName) throws IOException { + final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/_current"); + final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request)); + assertThat(responsePath.evaluate("total"), equalTo(1)); + assertThat(responsePath.evaluate("snapshots.0.snapshot"), equalTo(snapshotName)); + } + + private void assertCompletedSnapshot(String repositoryName, String snapshotName) throws IOException { + { + final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/_current"); + final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request)); + assertThat(responsePath.evaluate("total"), equalTo(0)); + } + + { + final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/" + snapshotName); + final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request)); + assertThat(responsePath.evaluate("total"), equalTo(1)); + assertThat(responsePath.evaluate("snapshots.0.snapshot"), equalTo(snapshotName)); + assertThat(responsePath.evaluate("snapshots.0.state"), not(equalTo("IN_PROGRESS"))); + } + } +} diff --git a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/SnapshotBasedRecoveryIT.java b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/SnapshotBasedRecoveryIT.java index 9217852f1867c..485e1d4f28826 100644 --- a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/SnapshotBasedRecoveryIT.java +++ b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/SnapshotBasedRecoveryIT.java @@ -233,7 +233,7 @@ private static Map search(String index, QueryBuilder query) thro return responseAsMap; } - private void indexDocs(String indexName, int numDocs) throws IOException { + static void indexDocs(String indexName, int numDocs) throws IOException { final StringBuilder bulkBody = new StringBuilder(); for (int i = 0; i < numDocs; i++) { bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n"); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 33c4bb0bef900..aad71bbf4f640 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -205,6 +205,7 @@ static TransportVersion def(int id) { public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED = def(9_037_0_00); public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0); public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00); + public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java index be748a578256e..c1964881ca0af 100644 --- a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java +++ b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java @@ -146,6 +146,29 @@ public static > boolean hasKey(MapDiff diff, return false; } + /** + * Create a new JDK map backed MapDiff by transforming the keys with the provided keyFunction. + * @param diff Original MapDiff to transform + * @param keyFunction Function to transform the key + * @param keySerializer Serializer for the new key + */ + public static , M1 extends Map> MapDiff> jdkMapDiffWithUpdatedKeys( + MapDiff diff, + Function keyFunction, + KeySerializer keySerializer + ) { + final List deletes = diff.getDeletes().stream().map(keyFunction).toList(); + final List>> diffs = diff.getDiffs() + .stream() + .map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue())) + .toList(); + final List> upserts = diff.getUpserts() + .stream() + .map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue())) + .toList(); + return new MapDiff<>(keySerializer, DiffableValueSerializer.getWriteOnlyInstance(), deletes, diffs, upserts, JdkMapBuilder::new); + } + /** * Creates a MapDiff that applies a single entry diff to a map */ diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index d82a31720d6d4..44682438585a9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -12,6 +12,8 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.ClusterState.Custom; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; @@ -22,6 +24,7 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; @@ -32,6 +35,7 @@ import org.elasticsearch.logging.Logger; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoryOperation; +import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardSnapshotResult; @@ -56,6 +60,8 @@ import java.util.Set; import java.util.stream.Stream; +import static org.elasticsearch.repositories.RepositoryOperation.PROJECT_REPO_SERIALIZER; + /** * Meta data about snapshots that are currently executing */ @@ -70,7 +76,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion"; /** Maps repository name to list of snapshots in that repository */ - private final Map entries; + private final Map entries; /** * IDs of nodes which are marked for removal, or which were previously marked for removal and still have running shard snapshots. @@ -104,56 +110,72 @@ private static Set readNodeIdsForRemoval(StreamInput in) throws IOExcept : Set.of(); } - private static Map collectByRepo(StreamInput in) throws IOException { + private static Map collectByRepo(StreamInput in) throws IOException { final int count = in.readVInt(); if (count == 0) { return Map.of(); } - final Map> entriesByRepo = new HashMap<>(); + final Map> entriesByRepo = new HashMap<>(); for (int i = 0; i < count; i++) { final Entry entry = Entry.readFrom(in); - entriesByRepo.computeIfAbsent(entry.repository(), repo -> new ArrayList<>()).add(entry); + entriesByRepo.computeIfAbsent(new ProjectRepo(entry.projectId(), entry.repository()), repo -> new ArrayList<>()).add(entry); } - final Map res = Maps.newMapWithExpectedSize(entriesByRepo.size()); - for (Map.Entry> entryForRepo : entriesByRepo.entrySet()) { + final Map res = Maps.newMapWithExpectedSize(entriesByRepo.size()); + for (Map.Entry> entryForRepo : entriesByRepo.entrySet()) { res.put(entryForRepo.getKey(), new ByRepo(entryForRepo.getValue())); } return res; } - private SnapshotsInProgress(Map entries, Set nodesIdsForRemoval) { + private SnapshotsInProgress(Map entries, Set nodesIdsForRemoval) { this.entries = Map.copyOf(entries); this.nodesIdsForRemoval = nodesIdsForRemoval; assert assertConsistentEntries(this.entries); } + @FixForMultiProject + @Deprecated(forRemoval = true) public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List updatedEntries) { - if (updatedEntries.equals(forRepo(repository))) { + return withUpdatedEntriesForRepo(Metadata.DEFAULT_PROJECT_ID, repository, updatedEntries); + } + + public SnapshotsInProgress withUpdatedEntriesForRepo(ProjectId projectId, String repository, List updatedEntries) { + if (updatedEntries.equals(forRepo(projectId, repository))) { return this; } - final Map copy = new HashMap<>(this.entries); + final Map copy = new HashMap<>(this.entries); + final var projectRepo = new ProjectRepo(projectId, repository); if (updatedEntries.isEmpty()) { - copy.remove(repository); + copy.remove(projectRepo); if (copy.isEmpty()) { return EMPTY; } } else { - copy.put(repository, new ByRepo(updatedEntries)); + copy.put(projectRepo, new ByRepo(updatedEntries)); } return new SnapshotsInProgress(copy, nodesIdsForRemoval); } public SnapshotsInProgress withAddedEntry(Entry entry) { - final List forRepo = new ArrayList<>(forRepo(entry.repository())); + final List forRepo = new ArrayList<>(forRepo(entry.projectId(), entry.repository())); forRepo.add(entry); - return withUpdatedEntriesForRepo(entry.repository(), forRepo); + return withUpdatedEntriesForRepo(entry.projectId(), entry.repository(), forRepo); } /** * Returns the list of snapshots in the specified repository. */ + @FixForMultiProject + @Deprecated(forRemoval = true) public List forRepo(String repository) { - return entries.getOrDefault(repository, ByRepo.EMPTY).entries; + return forRepo(Metadata.DEFAULT_PROJECT_ID, repository); + } + + /** + * Returns the list of snapshots in the specified repository. + */ + public List forRepo(ProjectId projectId, String repository) { + return entries.getOrDefault(new ProjectRepo(projectId, repository), ByRepo.EMPTY).entries; } public boolean isEmpty() { @@ -178,7 +200,7 @@ public Stream asStream() { @Nullable public Entry snapshot(final Snapshot snapshot) { - return findSnapshotInList(snapshot, forRepo(snapshot.getRepository())); + return findSnapshotInList(snapshot, forRepo(snapshot.getProjectId(), snapshot.getRepository())); } /** @@ -196,6 +218,25 @@ private static Entry findSnapshotInList(Snapshot snapshotToFind, List for return null; } + /** + * Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be + * deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance. + *

+ * An unique shard generation is created for every in-progress shard snapshot. The shard generation file contains information about all + * the files needed by pre-existing and any new shard snapshots that were in-progress. When a shard snapshot is finalized, its file list + * is promoted to the official shard snapshot list for the index shard. This final list will contain metadata about any other + * in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled + * for deletion now. + */ + @FixForMultiProject + @Deprecated(forRemoval = true) + public Map> obsoleteGenerations( + String repository, + SnapshotsInProgress oldClusterStateSnapshots + ) { + return obsoleteGenerations(Metadata.DEFAULT_PROJECT_ID, repository, oldClusterStateSnapshots); + } + /** * Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be * deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance. @@ -207,13 +248,14 @@ private static Entry findSnapshotInList(Snapshot snapshotToFind, List for * for deletion now. */ public Map> obsoleteGenerations( + ProjectId projectId, String repository, SnapshotsInProgress oldClusterStateSnapshots ) { final Map> obsoleteGenerations = new HashMap<>(); - final List latestSnapshots = forRepo(repository); + final List latestSnapshots = forRepo(projectId, repository); - for (Entry oldEntry : oldClusterStateSnapshots.forRepo(repository)) { + for (Entry oldEntry : oldClusterStateSnapshots.forRepo(projectId, repository)) { final Entry matchingLatestEntry = findSnapshotInList(oldEntry.snapshot(), latestSnapshots); if (matchingLatestEntry == null || matchingLatestEntry == oldEntry) { // The snapshot progress has not changed. @@ -412,15 +454,16 @@ private static boolean hasFailures(Map c return false; } - private static boolean assertConsistentEntries(Map entries) { - for (Map.Entry repoEntries : entries.entrySet()) { + private static boolean assertConsistentEntries(Map entries) { + for (Map.Entry repoEntries : entries.entrySet()) { final Set> assignedShards = new HashSet<>(); final Set> queuedShards = new HashSet<>(); final List entriesForRepository = repoEntries.getValue().entries; - final String repository = repoEntries.getKey(); + final ProjectRepo repository = repoEntries.getKey(); assert entriesForRepository.isEmpty() == false : "found empty list of snapshots for " + repository + " in " + entries; for (Entry entry : entriesForRepository) { - assert entry.repository().equals(repository) : "mismatched repository " + entry + " tracked under " + repository; + assert new ProjectRepo(entry.projectId(), entry.repository()).equals(repository) + : "mismatched repository " + entry + " tracked under " + repository; for (Map.Entry shard : entry.shardSnapshotStatusByRepoShardId().entrySet()) { final RepositoryShardId sid = shard.getKey(); final ShardSnapshotStatus shardSnapshotStatus = shard.getValue(); @@ -1241,6 +1284,11 @@ public Entry withStartedShards(Map shards) { return updated; } + @Override + public ProjectId projectId() { + return snapshot.getProjectId(); + } + @Override public String repository() { return snapshot.getRepository(); @@ -1391,6 +1439,7 @@ public String toString() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field("project_id", snapshot.getProjectId()); builder.field("repository", snapshot.getRepository()); builder.field("snapshot", snapshot.getSnapshotId().getName()); builder.field("uuid", snapshot.getSnapshotId().getUUID()); @@ -1725,25 +1774,43 @@ private static final class SnapshotInProgressDiff implements NamedDiff { private final SnapshotsInProgress after; - private final DiffableUtils.MapDiff> mapDiff; + private final DiffableUtils.MapDiff> mapDiff; private final Set nodeIdsForRemoval; SnapshotInProgressDiff(SnapshotsInProgress before, SnapshotsInProgress after) { - this.mapDiff = DiffableUtils.diff(before.entries, after.entries, DiffableUtils.getStringKeySerializer()); + this.mapDiff = DiffableUtils.diff(before.entries, after.entries, PROJECT_REPO_SERIALIZER); this.nodeIdsForRemoval = after.nodesIdsForRemoval; this.after = after; } SnapshotInProgressDiff(StreamInput in) throws IOException { - this.mapDiff = DiffableUtils.readJdkMapDiff( - in, - DiffableUtils.getStringKeySerializer(), - i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)), - i -> new ByRepo.ByRepoDiff( - DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new), - DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER) - ) - ); + if (in.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) { + final var oldMapDiff = DiffableUtils.readJdkMapDiff( + in, + DiffableUtils.getStringKeySerializer(), + i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)), + i -> new ByRepo.ByRepoDiff( + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new), + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER) + ) + ); + this.mapDiff = DiffableUtils.jdkMapDiffWithUpdatedKeys( + oldMapDiff, + repository -> new ProjectRepo(ProjectId.DEFAULT, repository), + PROJECT_REPO_SERIALIZER + ); + } else { + this.mapDiff = DiffableUtils.readJdkMapDiff( + in, + PROJECT_REPO_SERIALIZER, + i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)), + i -> new ByRepo.ByRepoDiff( + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new), + DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER) + ) + ); + } + this.nodeIdsForRemoval = readNodeIdsForRemoval(in); this.after = null; } @@ -1768,7 +1835,21 @@ public String getWriteableName() { public void writeTo(StreamOutput out) throws IOException { assert after != null : "should only write instances that were diffed from this node's state"; if (out.getTransportVersion().onOrAfter(DIFFABLE_VERSION)) { - mapDiff.writeTo(out); + if (out.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) { + DiffableUtils.jdkMapDiffWithUpdatedKeys(mapDiff, projectRepo -> { + if (ProjectId.DEFAULT.equals(projectRepo.projectId()) == false) { + final var message = "Cannot write instance with non-default project id " + + projectRepo.projectId() + + " to version before " + + TransportVersions.PROJECT_ID_IN_SNAPSHOT; + assert false : message; + throw new IllegalArgumentException(message); + } + return projectRepo.name(); + }, DiffableUtils.getStringKeySerializer()).writeTo(out); + } else { + mapDiff.writeTo(out); + } } else { new SimpleDiffable.CompleteDiff<>(after).writeTo(out); } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java index 749d8d1ad1d93..6cd1b7b6a286b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java @@ -8,11 +8,29 @@ */ package org.elasticsearch.repositories; +import org.elasticsearch.cluster.DiffableUtils; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.FixForMultiProject; + +import java.io.IOException; + /** * Coordinates of an operation that modifies a repository, assuming that repository at a specific generation. */ public interface RepositoryOperation { + /** + * Project for which repository belongs to. + */ + @FixForMultiProject(description = "default implementation is temporary") + default ProjectId projectId() { + return Metadata.DEFAULT_PROJECT_ID; + } + /** * Name of the repository affected. */ @@ -22,4 +40,34 @@ public interface RepositoryOperation { * The repository state id at the time the operation began. */ long repositoryStateId(); + + /** + * A project qualified repository + * @param projectId The project that the repository belongs to + * @param name Name of the repository + */ + record ProjectRepo(ProjectId projectId, String name) implements Writeable { + + public ProjectRepo(StreamInput in) throws IOException { + this(ProjectId.readFrom(in), in.readString()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + projectId.writeTo(out); + out.writeString(name); + } + } + + DiffableUtils.KeySerializer PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() { + @Override + public void writeKey(ProjectRepo key, StreamOutput out) throws IOException { + key.writeTo(out); + } + + @Override + public ProjectRepo readKey(StreamInput in) throws IOException { + return new ProjectRepo(in); + } + }; } diff --git a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java index 4dd1b6142baa5..89ebf13b58c95 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java +++ b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardGenerations; @@ -46,7 +47,7 @@ public static InFlightShardSnapshotStates forEntries(List> generations = new HashMap<>(); final Map> busyIds = new HashMap<>(); - assert snapshots.stream().map(SnapshotsInProgress.Entry::repository).distinct().count() == 1 + assert snapshots.stream().map(entry -> new ProjectRepo(entry.projectId(), entry.repository())).distinct().count() == 1 : "snapshots must either be an empty list or all belong to the same repository but saw " + snapshots; for (SnapshotsInProgress.Entry runningSnapshot : snapshots) { for (Map.Entry shard : runningSnapshot diff --git a/server/src/main/java/org/elasticsearch/snapshots/Snapshot.java b/server/src/main/java/org/elasticsearch/snapshots/Snapshot.java index 79bfc978ffc89..208cd4bcbef84 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/Snapshot.java +++ b/server/src/main/java/org/elasticsearch/snapshots/Snapshot.java @@ -9,9 +9,12 @@ package org.elasticsearch.snapshots; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.FixForMultiProject; import java.io.IOException; import java.util.Objects; @@ -21,6 +24,7 @@ */ public final class Snapshot implements Writeable { + private final ProjectId projectId; private final String repository; private final SnapshotId snapshotId; private final int hashCode; @@ -28,7 +32,17 @@ public final class Snapshot implements Writeable { /** * Constructs a snapshot. */ + @FixForMultiProject + @Deprecated(forRemoval = true) public Snapshot(final String repository, final SnapshotId snapshotId) { + this(ProjectId.DEFAULT, repository, snapshotId); + } + + /** + * Constructs a snapshot. + */ + public Snapshot(ProjectId projectId, final String repository, final SnapshotId snapshotId) { + this.projectId = projectId; this.repository = Objects.requireNonNull(repository); this.snapshotId = Objects.requireNonNull(snapshotId); this.hashCode = computeHashCode(); @@ -38,11 +52,20 @@ public Snapshot(final String repository, final SnapshotId snapshotId) { * Constructs a snapshot from the stream input. */ public Snapshot(final StreamInput in) throws IOException { + if (in.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) { + projectId = ProjectId.DEFAULT; + } else { + projectId = ProjectId.readFrom(in); + } repository = in.readString(); snapshotId = new SnapshotId(in); hashCode = computeHashCode(); } + public ProjectId getProjectId() { + return projectId; + } + /** * Gets the repository name for the snapshot. */ @@ -59,7 +82,7 @@ public SnapshotId getSnapshotId() { @Override public String toString() { - return repository + ":" + snapshotId.toString(); + return projectId + ":" + repository + ":" + snapshotId.toString(); } @Override @@ -71,7 +94,7 @@ public boolean equals(Object o) { return false; } Snapshot that = (Snapshot) o; - return repository.equals(that.repository) && snapshotId.equals(that.snapshotId); + return projectId.equals(that.projectId) && repository.equals(that.repository) && snapshotId.equals(that.snapshotId); } @Override @@ -80,11 +103,23 @@ public int hashCode() { } private int computeHashCode() { - return Objects.hash(repository, snapshotId); + return Objects.hash(projectId, repository, snapshotId); } @Override public void writeTo(final StreamOutput out) throws IOException { + if (out.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) { + if (ProjectId.DEFAULT.equals(projectId) == false) { + final var message = "Cannot write instance with non-default project id " + + projectId + + " to version before " + + TransportVersions.PROJECT_ID_IN_SNAPSHOT; + assert false : message; + throw new IllegalArgumentException(message); + } + } else { + projectId.writeTo(out); + } out.writeString(repository); snapshotId.writeTo(out); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index 61b33ca56ef8c..bc684fd0ea01c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -81,7 +81,7 @@ public void testCannotAllocatePrimaryMissingInRestoreInProgress() { assertThat( decision.getExplanation(), equalTo( - "shard has failed to be restored from the snapshot [_repository:_missing/_uuid] - manually close or " + "shard has failed to be restored from the snapshot [default:_repository:_missing/_uuid] - manually close or " + "delete the index [test] in order to retry to restore the snapshot again or use the reroute API " + "to force the allocation of an empty primary shard. Details: [restore_source[_repository/_missing]]" ) @@ -168,7 +168,8 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { assertThat( decision.getExplanation(), startsWith( - "shard has failed to be restored from the snapshot [_repository:_existing/_uuid] - manually close or delete the index " + "shard has failed to be restored from the snapshot [default:_repository:_existing/_uuid]" + + " - manually close or delete the index " + "[test] in order to retry to restore the snapshot again or use the reroute API to force the allocation of " + "an empty primary shard. Details: [restore_source[_repository/_existing], failure " + "java.io.IOException: i/o failure" diff --git a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java index 0213ee9046462..5fddadc9cdbe3 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java @@ -241,7 +241,7 @@ public void testNotAllowToRestoreGlobalStateFromSnapshotWithoutOne() { ); assertThat( exception.getMessage(), - equalTo("[name:name/uuid] cannot restore global state since the snapshot was created without global state") + equalTo("[default:name:name/uuid] cannot restore global state since the snapshot was created without global state") ); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 741dfd2466430..9647f77daa251 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -10,19 +10,23 @@ package org.elasticsearch.snapshots; import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; @@ -31,11 +35,13 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; import java.io.IOException; @@ -46,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -60,10 +67,14 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer @Override protected Custom createTestInstance() { - int numberOfSnapshots = randomInt(10); + return createTestInstance(() -> randomSnapshot(randomProjectIdOrDefault())); + } + + private Custom createTestInstance(Supplier randomEntrySupplier) { + int numberOfSnapshots = randomInt(20); SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY; for (int i = 0; i < numberOfSnapshots; i++) { - snapshotsInProgress = snapshotsInProgress.withAddedEntry(randomSnapshot()); + snapshotsInProgress = snapshotsInProgress.withAddedEntry(randomEntrySupplier.get()); } final var nodeIdsForRemoval = randomList(3, ESTestCase::randomUUID); @@ -76,6 +87,36 @@ protected Custom createTestInstance() { return snapshotsInProgress; } + public void testSerializationBwc() throws IOException { + final var oldVersion = TransportVersionUtils.getPreviousVersion(TransportVersions.PROJECT_ID_IN_SNAPSHOT); + final BytesStreamOutput out = new BytesStreamOutput(); + out.setTransportVersion(oldVersion); + final Custom original = createTestInstance(() -> randomSnapshot(ProjectId.DEFAULT)); + original.writeTo(out); + + final var in = out.bytes().streamInput(); + in.setTransportVersion(oldVersion); + final SnapshotsInProgress fromStream = new SnapshotsInProgress(in); + assertThat(fromStream, equalTo(original)); + } + + public void testDiffSerializationBwc() throws IOException { + final var oldVersion = TransportVersionUtils.getPreviousVersion(TransportVersions.PROJECT_ID_IN_SNAPSHOT); + final BytesStreamOutput out = new BytesStreamOutput(); + out.setTransportVersion(oldVersion); + + final Custom before = createTestInstance(() -> randomSnapshot(ProjectId.DEFAULT)); + final Custom after = makeTestChanges(before, () -> randomSnapshot(ProjectId.DEFAULT)); + final Diff diff = after.diff(before); + diff.writeTo(out); + + final var in = out.bytes().streamInput(); + in.setTransportVersion(oldVersion); + final NamedDiff diffFromStream = SnapshotsInProgress.readDiffFrom(in); + + assertThat(diffFromStream.apply(before), equalTo(after)); + } + private ClusterState getClusterStateWithNodeShutdownMetadata(List nodeIdsForRemoval) { return CLUSTER_STATE_FOR_NODE_SHUTDOWNS.copyAndUpdateMetadata( mdb -> mdb.putCustom( @@ -100,7 +141,15 @@ private ClusterState getClusterStateWithNodeShutdownMetadata(List nodeId } private Entry randomSnapshot() { - Snapshot snapshot = new Snapshot("repo-" + randomInt(5), new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10))); + return randomSnapshot(randomProjectIdOrDefault()); + } + + private Entry randomSnapshot(ProjectId projectId) { + Snapshot snapshot = new Snapshot( + projectId, + "repo-" + randomInt(5), + new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10)) + ); boolean includeGlobalState = randomBoolean(); boolean partial = randomBoolean(); int numberOfIndices = randomIntBetween(0, 10); @@ -158,6 +207,10 @@ protected Writeable.Reader instanceReader() { @Override protected Custom makeTestChanges(Custom testInstance) { + return makeTestChanges(testInstance, () -> randomSnapshot(randomProjectIdOrDefault())); + } + + protected Custom makeTestChanges(Custom testInstance, Supplier randomEntrySupplier) { final SnapshotsInProgress snapshots = (SnapshotsInProgress) testInstance; SnapshotsInProgress updatedInstance = SnapshotsInProgress.EMPTY; if (randomBoolean() && snapshots.count() > 1) { @@ -178,7 +231,7 @@ protected Custom makeTestChanges(Custom testInstance) { // add some elements int addElements = randomInt(10); for (int i = 0; i < addElements; i++) { - updatedInstance = updatedInstance.withAddedEntry(randomSnapshot()); + updatedInstance = updatedInstance.withAddedEntry(randomEntrySupplier.get()); } } if (randomBoolean()) { @@ -194,7 +247,8 @@ protected Custom makeTestChanges(Custom testInstance) { if (randomBoolean()) { entries = shuffledList(entries); } - updatedInstance = updatedInstance.withUpdatedEntriesForRepo(perRepoEntries.get(0).repository(), entries); + final Entry firstEntry = perRepoEntries.get(0); + updatedInstance = updatedInstance.withUpdatedEntriesForRepo(firstEntry.projectId(), firstEntry.repository(), entries); } } return updatedInstance; @@ -219,10 +273,12 @@ protected Custom mutateInstance(Custom instance) { return snapshotsInProgress.withAddedEntry(randomSnapshot()); } else { // mutate or remove an entry - final String repo = randomFrom( - snapshotsInProgress.asStream().map(SnapshotsInProgress.Entry::repository).collect(Collectors.toSet()) + final var repo = randomFrom( + snapshotsInProgress.asStream() + .map(entry -> new ProjectRepo(entry.projectId(), entry.repository())) + .collect(Collectors.toSet()) ); - final List forRepo = snapshotsInProgress.forRepo(repo); + final List forRepo = snapshotsInProgress.forRepo(repo.projectId(), repo.name()); int index = randomIntBetween(0, forRepo.size() - 1); Entry entry = forRepo.get(index); final List updatedEntries = new ArrayList<>(forRepo); @@ -231,7 +287,7 @@ protected Custom mutateInstance(Custom instance) { } else { updatedEntries.remove(index); } - return snapshotsInProgress.withUpdatedEntriesForRepo(repo, updatedEntries); + return snapshotsInProgress.withUpdatedEntriesForRepo(repo.projectId(), repo.name(), updatedEntries); } } else { return snapshotsInProgress.withUpdatedNodeIdsForRemoval( @@ -438,9 +494,10 @@ private Entry mutateEntryWithLegalChange(Entry entry) { public void testXContent() throws IOException { final IndexId indexId = new IndexId("index", "uuid"); + final ProjectId projectId = ProjectId.fromId("some-project"); SnapshotsInProgress sip = SnapshotsInProgress.EMPTY.withAddedEntry( Entry.snapshot( - new Snapshot("repo", new SnapshotId("name", "uuid")), + new Snapshot(projectId, "repo", new SnapshotId("name", "uuid")), true, true, State.SUCCESS, @@ -497,6 +554,7 @@ public void testXContent() throws IOException { { "snapshots": [ { + "project_id": "some-project", "repository": "repo", "snapshot": "name", "uuid": "uuid", @@ -547,6 +605,7 @@ public void testXContent() throws IOException { { "snapshots": [ { + "project_id": "some-project", "repository": "repo", "snapshot": "name", "uuid": "uuid",