diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 000000000..8524c67fe --- /dev/null +++ b/.gitattributes @@ -0,0 +1,9 @@ +# Set the default behavior, in case people don't have core.autocrlf set. +* text=auto + +# Explicitly declare text files you want to always be normalized and converted +# to native line endings on checkout. +*.sh text eol=lf +*.py text eol=lf +*.yml text eol=lf +*.conf text eol=lf \ No newline at end of file diff --git a/.gitignore b/.gitignore index f1279888b..96c378cbd 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,4 @@ nb-configuration.xml *.iml *.iws *~ - /bin/ diff --git a/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/sync/RepositorySyncStatus.java b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/sync/RepositorySyncStatus.java new file mode 100644 index 000000000..c14704b7d --- /dev/null +++ b/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/sync/RepositorySyncStatus.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.sync; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Collection; + +@XmlRootElement +@ApiModel(value = "RepositorySyncStatus") +public class RepositorySyncStatus { + private boolean isClean; + private boolean hasChanges; + private Collection changes; + + public RepositorySyncStatus(boolean isClean, boolean hasChanges, Collection changes) { + this.isClean = isClean; + this.hasChanges = hasChanges; + this.changes = changes; + } + + public RepositorySyncStatus(){} + + @ApiModelProperty(value = "Repository is in sync with registry.", required = true) + public boolean getIsClean() { + return isClean; + } + public void setIsClean(boolean isClean) { + this.isClean = isClean; + } + + @ApiModelProperty(value = "The repository contains changes not reflected in registry.", required = true) + public boolean getHasChanges() { + return this.hasChanges; + } + public void setHasChanges(boolean hasChanges) { + this.hasChanges = hasChanges; + } + + @ApiModelProperty(value = "List of changes in the repository which should be synchronized with registry.", required = true) + public Collection getChanges() { + return this.changes; + } + public void setChanges(Collection changes) { + this.changes = changes; + } +} diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java index 071656d0e..bd4666e96 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java @@ -23,6 +23,7 @@ import org.apache.nifi.registry.flow.FlowSnapshotContext; import org.apache.nifi.registry.provider.ProviderConfigurationContext; import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.provider.sync.RepositorySyncStatus; import org.apache.nifi.registry.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,4 +184,23 @@ protected File getSnapshotFile(final String bucketId, final String flowId, final return new File(flowStorageDir, snapshotFilename); } + @Override + public Boolean canBeSynchronized() { + return false; + } + + @Override + public void getLatestChangesOfRemoteRepository() { + + } + + @Override + public void resetRepository() { + + } + + @Override + public RepositorySyncStatus getStatus() { + return RepositorySyncStatus.SuccessfulSynchronizedRepository(); + } } diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java index 488a619e0..946d00834 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.registry.provider.flow.git; +import org.apache.nifi.registry.util.FileUtils; + import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -39,10 +41,14 @@ public boolean hasVersion(int version) { return versions.containsKey(version); } - public FlowPointer getFlowVersion(int version) { + public FlowPointer getFlowVersion(int version){ return versions.get(version); } + public String getFlowId(){ + return this.flowId; + } + public void putVersion(int version, FlowPointer pointer) { versions.put(version, pointer); } @@ -65,7 +71,7 @@ public static class FlowPointer { /** * Create new FlowPointer instance. - * @param fileName The filename must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so. + * @param fileName The filename must be sanitized, use {@link FileUtils#sanitizeFilename(String)} to do so. */ public FlowPointer(String fileName) { this.fileName = fileName; diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java index c5ee27703..3c61b5295 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java @@ -16,13 +16,19 @@ */ package org.apache.nifi.registry.provider.flow.git; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.eclipse.jgit.api.CloneCommand; import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.PullCommand; +import org.eclipse.jgit.api.PullResult; import org.eclipse.jgit.api.PushCommand; +import org.eclipse.jgit.api.ResetCommand; import org.eclipse.jgit.api.Status; import org.eclipse.jgit.api.errors.GitAPIException; import org.eclipse.jgit.api.errors.NoHeadException; import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.revwalk.RevCommit; import org.eclipse.jgit.revwalk.RevTree; @@ -30,6 +36,7 @@ import org.eclipse.jgit.transport.CredentialsProvider; import org.eclipse.jgit.transport.PushResult; import org.eclipse.jgit.transport.RemoteConfig; +import org.eclipse.jgit.transport.URIish; import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; import org.eclipse.jgit.treewalk.TreeWalk; import org.slf4j.Logger; @@ -42,6 +49,7 @@ import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.Writer; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; @@ -79,6 +87,7 @@ class GitFlowMetaData { private Repository gitRepo; private String remoteToPush; private CredentialsProvider credentialsProvider; + private ScheduledExecutorService executorService; private final BlockingQueue pushQueue = new ArrayBlockingQueue<>(1); @@ -100,10 +109,25 @@ public void setRemoteCredential(String userName, String password) { * @param gitProjectRootDir a root directory of a Git project * @return created Repository * @throws IOException thrown when the specified directory does not exist, - * does not have read/write privilege or not containing .git directory + * does not have read/write privilege or not containing .git directory */ - private Repository openRepository(final File gitProjectRootDir) throws IOException { + private Repository openRepository(final File gitProjectRootDir, Boolean enforceRecreate) throws IOException { + if (this.gitRepo != null && !enforceRecreate) { + return this.gitRepo; + } + + final FileRepositoryBuilder builder = createRepositoryBuilder(gitProjectRootDir); + + if (builder.getGitDir() == null) { + throw new IOException(format("Directory '%s' does not contain a .git directory." + + " Please init and configure the directory with 'git init' command before using it from NiFi Registry.", + gitProjectRootDir)); + } + + return builder.build(); + } + private FileRepositoryBuilder createRepositoryBuilder(File gitProjectRootDir) throws IOException { // Instead of using FileUtils.ensureDirectoryExistAndCanReadAndWrite, check availability manually here. // Because the util will try to create a dir if not exist. // The git dir should be initialized and configured by users. @@ -116,37 +140,140 @@ private Repository openRepository(final File gitProjectRootDir) throws IOExcepti } // Search .git dir but avoid searching parent directories. - final FileRepositoryBuilder builder = new FileRepositoryBuilder() + return new FileRepositoryBuilder() .readEnvironment() .setMustExist(true) .addCeilingDirectory(gitProjectRootDir) .findGitDir(gitProjectRootDir); + } - if (builder.getGitDir() == null) { - throw new IOException(format("Directory '%s' does not contain a .git directory." + - " Please init and configure the directory with 'git init' command before using it from NiFi Registry.", - gitProjectRootDir)); + + public void resetGitRepository(File gitProjectRootDir) throws IOException, GitAPIException, InterruptedException { + URI gitRepositoryUrl = URI.create(""); + if (this.isGitRepositoryExisting(gitProjectRootDir)) { + gitRepo = openRepository(gitProjectRootDir, false); + + try (final Git git = new Git(gitRepo)) { + TerminateWhenRemotePathDoesNotExist(git); + + if (!git.status().call().isClean()) { + throw new IOException("Directory '%s' contains changes. " + + "Therefore a complete reset of the repository is not possible.\n" + + "Please commit your changes and push to remote repository manually." + + "Git persistence provider does not recover from conflicting changes automatically."); + } + + Optional remoteConfig = git.remoteList().call() + .stream().filter(r -> r.getName().equalsIgnoreCase(remoteToPush)).findFirst(); + if (remoteConfig.isPresent() && !remoteConfig.get().getURIs().isEmpty()) { + URIish remoteUri = remoteConfig.get().getURIs().get(0); + if (remoteUri.getHost() == null) { + gitRepositoryUrl = new File(remoteUri.toString()).toURI(); + } else { + gitRepositoryUrl = URI.create(remoteUri.toString()); + } + } else { + throw new IOException("Cannot find/derive a remote git repository uri. Please provide a valid " + + "remote origin by initializing your git repository correctly (for example: " + + "git remote add origin repositoryUri)."); + } + } } - return builder.build(); + closeRepository(); + + File backupDir = deriveBackupDir(gitProjectRootDir); + try { + backupProjectDir(gitProjectRootDir, backupDir); + FileUtils.deleteDirectory(gitProjectRootDir); + cloneRepository(gitProjectRootDir, gitRepositoryUrl); + } catch (Exception ex) { + restoreProjectDir(gitProjectRootDir, backupDir); + throw ex; + } finally { + safeDeleteDir(backupDir); + } + } + + private void safeDeleteDir(File dir) throws IOException { + if (dir.exists()) { + FileUtils.deleteDirectory(dir); + } + } + + private void restoreProjectDir(File gitProjectRootDir, File backupDir) throws IOException { + safeDeleteDir(gitProjectRootDir); + FileUtils.copyDirectory(backupDir, gitProjectRootDir); + } + + private void backupProjectDir(File gitProjectRootDir, File backupDir) throws IOException { + safeDeleteDir(backupDir); + FileUtils.copyDirectory(gitProjectRootDir, backupDir); + } + + private File deriveBackupDir(File gitProjectRootDir) { + return new File(gitProjectRootDir.getParentFile(), "backup"); + } + + private void cloneRepository(File gitProjectRootDir, URI gitRepositoryUrl) throws GitAPIException { + CloneCommand command = Git.cloneRepository() + .setURI(gitRepositoryUrl.toString()) + .setDirectory(gitProjectRootDir); + if (credentialsProvider != null) { + command.setCredentialsProvider(credentialsProvider); + } + + command.call().close(); + } + + public void closeRepository() throws InterruptedException { + this.stopPushThread(); + if (gitRepo != null) { + gitRepo.close(); + } + } + + public void pullChanges(File gitProjectRootDir) throws IOException, GitAPIException { + // TODO: prevent others from pushing during the pull command + // TODO: wait until push thread has terminated + gitRepo = openRepository(gitProjectRootDir, false); + + try (final Git git = new Git(gitRepo)) { + TerminateWhenRemotePathDoesNotExist(git); + + if (!git.status().call().isClean()) { + throw new IOException("Directory '%s' contains changes. " + + "Therefore a complete reset of the repository is not possible.\n" + + "Please commit your changes and push to remote repository."); + } + + final PullCommand pullCommand = git.pull().setRemote(this.remoteToPush) + .setRemoteBranchName(gitRepo.getFullBranch()); + + if (credentialsProvider != null) { + pullCommand.setCredentialsProvider(credentialsProvider); + } + + final PullResult pullResult = pullCommand.call(); + if (!pullResult.isSuccessful()) { + final Ref ref = git.reset().setMode(ResetCommand.ResetType.HARD).call(); + logger.info("reset git repository to {}, because pull request was not successful.", ref.toString()); + throw new IOException( + format("The pull command was not successful because '%s'.", pullResult.toString())); + } else { + this.loadGitRepository(gitProjectRootDir); + } + } } @SuppressWarnings("unchecked") public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPIException { - gitRepo = openRepository(gitProjectRootDir); + gitRepo = openRepository(gitProjectRootDir, false); try (final Git git = new Git(gitRepo)) { // Check if remote exists. - if (!isEmpty(remoteToPush)) { - final List remotes = git.remoteList().call(); - final boolean isRemoteExist = remotes.stream().anyMatch(remote -> remote.getName().equals(remoteToPush)); - if (!isRemoteExist) { - final List remoteNames = remotes.stream().map(RemoteConfig::getName).collect(Collectors.toList()); - throw new IllegalArgumentException( - format("The configured remote '%s' to push does not exist. Available remotes are %s", remoteToPush, remoteNames)); - } - } + TerminateWhenRemotePathDoesNotExist(git); boolean isLatestCommit = true; try { @@ -192,6 +319,18 @@ public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPI } } + private void TerminateWhenRemotePathDoesNotExist(Git git) throws GitAPIException { + if (!isEmpty(remoteToPush)) { + final List remotes = git.remoteList().call(); + final boolean isRemoteExist = remotes.stream().anyMatch(remote -> remote.getName().equals(remoteToPush)); + if (!isRemoteExist) { + final List remoteNames = remotes.stream().map(RemoteConfig::getName).collect(Collectors.toList()); + throw new IllegalArgumentException( + format("The configured remote '%s' to push does not exist. Available remotes are %s", remoteToPush, remoteNames)); + } + } + } + void startPushThread() { // If successfully loaded, start pushing thread if necessary. if (isEmpty(remoteToPush)) { @@ -204,7 +343,7 @@ void startPushThread() { // Use scheduled fixed delay to control the minimum interval between push activities. // The necessity of executing push is controlled by offering messages to the pushQueue. // If multiple commits are made within this time window, those are pushed by a single push execution. - final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); executorService.scheduleWithFixedDelay(() -> { final Long offeredTimestamp; @@ -233,6 +372,18 @@ void startPushThread() { }, 10, 10, TimeUnit.SECONDS); } + void stopPushThread() throws InterruptedException { + if(this.executorService == null) + return; + + this.executorService.shutdown(); + // push latest changes + this.pushQueue.offer(System.currentTimeMillis()); + while (!this.executorService.isTerminated()) { + Thread.sleep(100); + } + } + @SuppressWarnings("unchecked") private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestCommit, Map bucketObjectIds, Map flowSnapshotObjectIds) throws IOException { final Yaml yaml = new Yaml(); @@ -315,7 +466,7 @@ private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, if (!flow.hasVersion(version)) { final Flow.FlowPointer pointer = new Flow.FlowPointer(flowSnapshotFilename); final File flowSnapshotFile = new File(new File(backetFilePath).getParent(), flowSnapshotFilename); - final ObjectId objectId = flowSnapshotObjectIds.get(flowSnapshotFile.getPath()); + final ObjectId objectId = flowSnapshotObjectIds.get(flowSnapshotFile.getPath().replaceAll("\\\\", "/")); if (objectId == null) { logger.warn("Git object id for Flow {} version {} with path {} in bucket {}:{} was not found. Ignoring this entry.", flowId, version, flowSnapshotFile.getPath(), bucket.getBucketDirName(), bucket.getBucketId()); @@ -332,12 +483,18 @@ private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, } if (flowMeta.containsKey(AUTHOR)) { pointer.setAuthor((String)flowMeta.get(AUTHOR)); + }else{ + pointer.setAuthor(commit.getCommitterIdent().getName()); } if (flowMeta.containsKey(COMMENTS)) { pointer.setComment((String)flowMeta.get(COMMENTS)); + }else{ + pointer.setComment(commit.getFullMessage()); } if (flowMeta.containsKey(CREATED)) { pointer.setCreated((long)flowMeta.get(CREATED)); + }else{ + pointer.setCreated(commit.getCommitTime() * 1000L); } flow.putVersion(version, pointer); @@ -345,7 +502,7 @@ private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, } } - private boolean validateRequiredValue(final Map map, String nameOfMap, Object ... keys) { + private boolean validateRequiredValue(final Map map, String nameOfMap, Object... keys) { for (Object key : keys) { if (!map.containsKey(key)) { logger.warn("{} does not have {}. Skipping it.", nameOfMap, key); @@ -383,11 +540,16 @@ boolean isGitDirectoryClean() throws GitAPIException { return status.isClean() && !status.hasUncommittedChanges(); } + boolean isGitRepositoryExisting(File gitRepository) throws IOException { + return this.createRepositoryBuilder(gitRepository).getGitDir() != null; + } + /** * Create a Git commit. - * @param author The name of a NiFi Registry user who created the snapshot. It will be added to the commit message. - * @param message Commit message. - * @param bucket A bucket to commit. + * + * @param author The name of a NiFi Registry user who created the snapshot. It will be added to the commit message. + * @param message Commit message. + * @param bucket A bucket to commit. * @param flowPointer A flow pointer for the flow snapshot which is updated. * After a commit is created, new commit rev id and flow snapshot file object id are set to this pointer. * It can be null if none of flow content is modified. @@ -448,4 +610,11 @@ byte[] getContent(String objectId) throws IOException { return gitRepo.newObjectReader().open(flowSnapshotObjectId).getBytes(); } + public SyncStatus getStatus() throws GitAPIException { + final Status status = new Git(this.gitRepo).status().call(); + return new SyncStatus( + status.isClean(), + status.hasUncommittedChanges(), + status.getConflictingStageState().keySet()) ; + } } diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java index 08fa46783..630ba4617 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java @@ -24,6 +24,8 @@ import org.apache.nifi.registry.metadata.FlowSnapshotMetadata; import org.apache.nifi.registry.provider.ProviderConfigurationContext; import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.provider.StandardProviderConfigurationContext; +import org.apache.nifi.registry.provider.sync.RepositorySyncStatus; import org.apache.nifi.registry.util.FileUtils; import org.eclipse.jgit.api.errors.GitAPIException; import org.slf4j.Logger; @@ -47,19 +49,22 @@ public class GitFlowPersistenceProvider implements MetadataAwareFlowPersistenceP private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class); static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory"; - private static final String REMOTE_TO_PUSH = "Remote To Push"; + protected static final String REMOTE_TO_PUSH = "Remote To Push"; private static final String REMOTE_ACCESS_USER = "Remote Access User"; private static final String REMOTE_ACCESS_PASSWORD = "Remote Access Password"; static final String SNAPSHOT_EXTENSION = ".snapshot"; private File flowStorageDir; - private GitFlowMetaData flowMetaData; + protected GitFlowMetaData flowMetaData; + private Map props; + + @Override public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException { flowMetaData = new GitFlowMetaData(); - final Map props = configurationContext.getProperties(); + this.props = configurationContext.getProperties(); if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) { throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided"); } @@ -274,6 +279,46 @@ public void deleteFlowContent(String bucketId, String flowId, int version) throw // TODO: Do nothing? This signature is not used. Actually there's nothing to do to the old versions as those exist in old commits even if this method is called. } + + @Override + public Boolean canBeSynchronized() { + return true; + } + + @Override + public void getLatestChangesOfRemoteRepository() throws IOException { + try { + this.flowMetaData.pullChanges(flowStorageDir); + }catch(GitAPIException apiException){ + throw new IOException(apiException); + } + } + + @Override + public void resetRepository() throws IOException { + try { + this.flowMetaData.resetGitRepository(flowStorageDir); + this.onConfigured(new StandardProviderConfigurationContext(this.props)); + } catch (GitAPIException e) { + throw new IOException(e); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public RepositorySyncStatus getStatus() throws IOException { + try{ + SyncStatus status = this.flowMetaData.getStatus(); + return new RepositorySyncStatus( + status.isClean(), + status.hasUncommittedChanges(), + status.getConflictingChanges()); + }catch(GitAPIException e){ + throw new IOException(e); + } + + } @Override public List getMetadata() { final Map gitBuckets = flowMetaData.getBuckets(); diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/SyncStatus.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/SyncStatus.java new file mode 100644 index 000000000..895941731 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/SyncStatus.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import java.util.Collection; + +public class SyncStatus { + private boolean isClean; + private boolean hasUncommittedChanges; + private Collection conflictingChanges; + + public SyncStatus(boolean isClean, boolean hasUncommittedChanges, Collection conflictingChanges) { + this.isClean = isClean; + this.hasUncommittedChanges = hasUncommittedChanges; + this.conflictingChanges = conflictingChanges; + } + + public boolean isClean() { + return isClean; + } + + public boolean hasUncommittedChanges() { + return hasUncommittedChanges; + } + + public Collection getConflictingChanges() { + return conflictingChanges; + } +} diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java index 606fca217..ff12132d1 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java @@ -37,10 +37,10 @@ import org.apache.nifi.registry.extension.bundle.BundleVersion; import org.apache.nifi.registry.extension.bundle.BundleVersionFilterParams; import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata; -import org.apache.nifi.registry.extension.component.manifest.Extension; import org.apache.nifi.registry.extension.component.ExtensionFilterParams; import org.apache.nifi.registry.extension.component.ExtensionMetadata; import org.apache.nifi.registry.extension.component.TagCount; +import org.apache.nifi.registry.extension.component.manifest.Extension; import org.apache.nifi.registry.extension.component.manifest.ProvidedServiceAPI; import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact; import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket; @@ -60,6 +60,7 @@ import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.provider.flow.FlowMetadataSynchronizer; import org.apache.nifi.registry.provider.extension.StandardBundleCoordinate; import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; import org.apache.nifi.registry.serialization.Serializer; @@ -68,6 +69,7 @@ import org.apache.nifi.registry.service.mapper.BucketMappings; import org.apache.nifi.registry.service.mapper.ExtensionMappings; import org.apache.nifi.registry.service.mapper.FlowMappings; +import org.apache.nifi.registry.sync.RepositorySyncStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -83,6 +85,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -95,6 +98,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; + /** * Main service for all back-end operations, REST resources should only interact with this service. * @@ -217,7 +222,7 @@ public List getBuckets() { readLock.lock(); try { final List buckets = metadataService.getAllBuckets(); - return buckets.stream().map(b -> BucketMappings.map(b)).collect(Collectors.toList()); + return buckets.stream().map(b -> BucketMappings.map(b)).collect(toList()); } finally { readLock.unlock(); } @@ -227,7 +232,7 @@ public List getBuckets(final Set bucketIds) { readLock.lock(); try { final List buckets = metadataService.getBuckets(bucketIds); - return buckets.stream().map(b -> BucketMappings.map(b)).collect(Collectors.toList()); + return buckets.stream().map(b -> BucketMappings.map(b)).collect(toList()); } finally { readLock.unlock(); } @@ -501,7 +506,7 @@ public List getFlows(final String bucketId) { // return non-verbose set of flows for the given bucket final List flows = metadataService.getFlowsByBucket(existingBucket.getId()); - return flows.stream().map(f -> FlowMappings.map(existingBucket, f)).collect(Collectors.toList()); + return flows.stream().map(f -> FlowMappings.map(existingBucket, f)).collect(toList()); } finally { readLock.unlock(); } @@ -1280,4 +1285,55 @@ public Set getFlowFields() { return metadataService.getFlowFields(); } + + public Collection syncBuckets(){ + if (this.flowPersistenceProvider.canBeSynchronized()) { + deleteAllBucketsInMetaDatabase(); + return createBucketsFromProvider(); + } + + return this.metadataService.getAllBuckets() + .stream().map(BucketMappings::map) + .collect(toList()); + } + + private void deleteAllBucketsInMetaDatabase() { + for (BucketEntity bucketEntity : this.metadataService.getAllBuckets()) { + this.metadataService.deleteBucket(bucketEntity); + } + } + + private Collection createBucketsFromProvider() { + FlowMetadataSynchronizer metadataSynchronizer = + new FlowMetadataSynchronizer(this.metadataService, this.flowPersistenceProvider); + metadataSynchronizer.synchronize(); + + return this.metadataService.getAllBuckets().stream().map(BucketMappings::map).collect(toList()); + } + + public void resetProviderRepository() throws IOException { + if (this.flowPersistenceProvider.canBeSynchronized()) { + flowPersistenceProvider.resetRepository(); + return; + } + + throw new IOException("Cannot reset provider repository "+ + "because the current provider does not support synchronization tasks."); + } + + public void getLatestChangesOfRemoteRepository() throws IOException { + if (this.flowPersistenceProvider.canBeSynchronized()) { + this.flowPersistenceProvider.getLatestChangesOfRemoteRepository(); + return; + } + + throw new IOException("Cannot get latest changes of remote repository " + + "because the current provider does not support synchronization tasks."); + } + + public RepositorySyncStatus getStatus() throws IOException { + org.apache.nifi.registry.provider.sync.RepositorySyncStatus status = this.flowPersistenceProvider.getStatus(); + RepositorySyncStatus dataTransferObject = new RepositorySyncStatus(status.isClean(), status.hasChanges(), status.changes()); + return dataTransferObject; + } } diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java index 430f3a3bf..104a0f432 100644 --- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java +++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java @@ -16,9 +16,10 @@ */ package org.apache.nifi.registry.provider; +import org.apache.nifi.registry.flow.FlowPersistenceException; import org.apache.nifi.registry.flow.FlowPersistenceProvider; import org.apache.nifi.registry.flow.FlowSnapshotContext; -import org.apache.nifi.registry.flow.FlowPersistenceException; +import org.apache.nifi.registry.provider.sync.RepositorySyncStatus; import java.util.Map; @@ -54,4 +55,24 @@ public void deleteFlowContent(String bucketId, String flowId, int version) throw public Map getProperties() { return properties; } + + @Override + public Boolean canBeSynchronized() { + return false; + } + + @Override + public void getLatestChangesOfRemoteRepository() { + + } + + @Override + public void resetRepository() { + + } + + @Override + public RepositorySyncStatus getStatus() { + return RepositorySyncStatus.SuccessfulSynchronizedRepository(); + } } diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java index 45351abc1..1dc07a413 100644 --- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java +++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java @@ -21,6 +21,7 @@ import org.apache.nifi.registry.provider.ProviderCreationException; import org.apache.nifi.registry.provider.StandardProviderConfigurationContext; import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; +import org.apache.nifi.registry.provider.sync.RepositorySyncStatus; import org.apache.nifi.registry.util.FileUtils; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.errors.GitAPIException; @@ -39,12 +40,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import static org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider.REMOTE_TO_PUSH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class TestGitFlowPersistenceProvider { private static final Logger logger = LoggerFactory.getLogger(TestGitFlowPersistenceProvider.class); + private String REMOTE_REPO_DIR_PROP = "REMOTE_REPO_DIR_PROP"; private void assertCreationFailure(final Map properties, final Consumer assertion) { final GitFlowPersistenceProvider persistenceProvider = new GitFlowPersistenceProvider(); @@ -70,7 +73,7 @@ public void testLoadNonExistingDir() { final Map properties = new HashMap<>(); properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/non-existing"); assertCreationFailure(properties, - e -> assertEquals("'target/non-existing' is not a directory or does not exist.", e.getCause().getMessage())); + e -> assertEquals("'target" + File.separator + "non-existing' is not a directory or does not exist.", e.getCause().getMessage())); } @Test @@ -95,19 +98,9 @@ private void assertProvider(final Map properties, final GitConsu try { FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir); - try (final Git git = Git.init().setDirectory(gitDir).call()) { - logger.debug("Initiated a git repository {}", git); - final StoredConfig config = git.getRepository().getConfig(); - config.setString("user", null, "name", "git-user"); - config.setString("user", null, "email", "git-user@example.com"); - config.save(); - gitConsumer.accept(git); - } - - final GitFlowPersistenceProvider persistenceProvider = new GitFlowPersistenceProvider(); + initializeLocalRepository(gitConsumer, gitDir); - final ProviderConfigurationContext configurationContext = new StandardProviderConfigurationContext(properties); - persistenceProvider.onConfigured(configurationContext); + final GitFlowPersistenceProvider persistenceProvider = configureGitFlowPersistenceProvider(properties); assertion.accept(persistenceProvider); } finally { @@ -117,12 +110,67 @@ private void assertProvider(final Map properties, final GitConsu } } + private void initializeLocalRepository(GitConsumer gitConsumer, File gitDir) throws IOException, GitAPIException { + try (final Git git = Git.init().setDirectory(gitDir).call()) { + logger.debug("Initiated a git repository {}", git); + final StoredConfig config = git.getRepository().getConfig(); + config.setString("user", null, "name", "git-user"); + config.setString("user", null, "email", "git-user@example.com"); + config.save(); + gitConsumer.accept(git); + } + } + + private void cleanupGitRepository(File gitDir) throws IOException { + deleteGitRepository(gitDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir); + } + + private GitFlowPersistenceProvider configureGitFlowPersistenceProvider(Map properties) { + final GitFlowPersistenceProvider persistenceProvider = new GitFlowPersistenceProvider(); + final ProviderConfigurationContext configurationContext = new StandardProviderConfigurationContext(properties); + persistenceProvider.onConfigured(configurationContext); + return persistenceProvider; + } + + private void cloneIntoLocalRepository(GitConsumer gitConsumer, File gitDir, File remoteGitDir) throws IOException, GitAPIException { + try (final Git git = Git.cloneRepository() + .setURI(remoteGitDir.getAbsolutePath()) + .setDirectory(gitDir).call()) { + logger.debug("Initiated a git repository {}", git); + final StoredConfig config = git.getRepository().getConfig(); + config.setString("user", null, "name", "git-user"); + config.setString("user", null, "email", "git-user@example.com"); + config.save(); + gitConsumer.accept(git); + } + } + + private void createGitRemoteRepository(File remoteGitDir) throws GitAPIException, IOException { + boolean gitRepoExists = remoteGitDir.exists() + && org.apache.commons.io.FileUtils.directoryContains(remoteGitDir, + new File(remoteGitDir, "HEAD")); + if (gitRepoExists) { + return; + } + + Git.init().setBare(true).setDirectory(remoteGitDir).setGitDir(remoteGitDir).call().close(); + logger.info("initialized remote git repository at " + remoteGitDir.getAbsolutePath()); + } + + private void deleteGitRepository(File gitDir) throws IOException { + if (gitDir.exists()) { + org.apache.commons.io.FileUtils.deleteDirectory(gitDir); + } + } + @Test public void testLoadEmptyGitDir() throws GitAPIException, IOException { final Map properties = new HashMap<>(); properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/empty-git"); - assertProvider(properties, g -> {}, p -> { + assertProvider(properties, g -> { + }, p -> { try { p.getFlowContent("bucket-id-A", "flow-id-1", 1); } catch (FlowPersistenceException e) { @@ -287,4 +335,152 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { } }, true); } + + @Test + public void testResetGitRepository() throws IOException, GitAPIException, InterruptedException { + final Map properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/local-repo"); + properties.put(REMOTE_TO_PUSH, "origin"); + //inject variable which exists in the test env only + properties.put(REMOTE_REPO_DIR_PROP, "target/remote-repo"); + final File gitDir = new File(properties.get(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP)); + final File remoteGitDir = new File(properties.get(REMOTE_REPO_DIR_PROP)); + + try { + cleanupGitRepository(gitDir); + cleanupGitRepository(remoteGitDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(remoteGitDir); + + createGitRemoteRepository(remoteGitDir); + + cloneIntoLocalRepository(g -> { + }, gitDir, remoteGitDir); + final GitFlowPersistenceProvider sut = configureGitFlowPersistenceProvider(properties); + commitInitialSampleChanges(sut, builder -> { + }); + waitUntilPushHasBeenFinished(); + + sut.resetRepository(); + + final byte[] flowVersion = sut.getFlowContent("bucket-id-A", "flow-id-1", 2); + assertEquals("Flow1 ver.2", new String(flowVersion, StandardCharsets.UTF_8)); + // free all handles + sut.flowMetaData.closeRepository(); + } finally { + deleteGitRepository(gitDir); + deleteGitRepository(remoteGitDir); + } + } + + private void commitInitialSampleChanges(GitFlowPersistenceProvider p, + Consumer postInitialChangesLambda) { + final StandardFlowSnapshotContext.Builder contextBuilder = new StandardFlowSnapshotContext.Builder() + .bucketId("bucket-id-A") + .bucketName("C'est/Bucket A/です。") + .flowId("flow-id-1") + .flowName("テスト_用/フロー#1\\[contains invalid chars]") + .author("unit-test-user") + .comments("Initial commit.") + .snapshotTimestamp(new Date().getTime()) + .version(1); + + final byte[] flow1Ver1 = "Flow1 ver.1".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver1); + + contextBuilder.comments("2nd commit.").version(2); + final byte[] flow1Ver2 = "Flow1 ver.2".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver2); + + postInitialChangesLambda.accept(contextBuilder); + } + + @Test + public void testPullChanges() throws InterruptedException, GitAPIException, IOException { + final Map properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/local-repo"); + properties.put(REMOTE_TO_PUSH, "origin"); + //inject variable which exists in the test env only + properties.put(REMOTE_REPO_DIR_PROP, "target/remote-repo"); + final File gitDir = new File(properties.get(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP)); + final File secondGitDir = new File("target/second-local-repo"); + final File remoteGitDir = new File(properties.get(REMOTE_REPO_DIR_PROP)); + + + try { + cleanupGitRepository(gitDir); + cleanupGitRepository(secondGitDir); + cleanupGitRepository(remoteGitDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(remoteGitDir); + + createGitRemoteRepository(remoteGitDir); + cloneIntoLocalRepository(g -> { + }, gitDir, remoteGitDir); + final GitFlowPersistenceProvider sut = configureGitFlowPersistenceProvider(properties); + commitInitialSampleChanges(sut, builder -> { + }); + waitUntilPushHasBeenFinished(); + + cloneIntoLocalRepository(g -> { + }, secondGitDir, remoteGitDir); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/second-local-repo"); + final GitFlowPersistenceProvider secondRepo = configureGitFlowPersistenceProvider(properties); + commitInitialSampleChanges(secondRepo, builder -> { + builder.comments("3rd commit made in the remote repository only.").version(3); + final byte[] flow1Ver3 = "Flow1 ver.3".getBytes(StandardCharsets.UTF_8); + secondRepo.saveFlowContent(builder.build(), flow1Ver3); + }); + waitUntilPushHasBeenFinished(); + + sut.getLatestChangesOfRemoteRepository(); + final byte[] flowVersion = sut.getFlowContent("bucket-id-A", "flow-id-1", 3); + assertEquals("Flow1 ver.3", new String(flowVersion, StandardCharsets.UTF_8)); + + // free all handles + sut.flowMetaData.closeRepository(); + secondRepo.flowMetaData.closeRepository(); + } finally { + deleteGitRepository(gitDir); + deleteGitRepository(secondGitDir); + deleteGitRepository(remoteGitDir); + } + } + + /* + just testing the happy path + */ + @Test + public void testGetSyncStatus() throws IOException, GitAPIException, InterruptedException { + final Map properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/local-repo"); + final File gitDir = new File(properties.get(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP)); + + try { + cleanupGitRepository(gitDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir); + + initializeLocalRepository(g -> {}, gitDir); + final GitFlowPersistenceProvider sut = configureGitFlowPersistenceProvider(properties); + final RepositorySyncStatus actualSyncStatus = sut.getStatus(); + final RepositorySyncStatus expectedSyncStatus = RepositorySyncStatus.SuccessfulSynchronizedRepository(); + + assertEquals(expectedSyncStatus.isClean(), actualSyncStatus.isClean()); + assertEquals(expectedSyncStatus.hasChanges(), actualSyncStatus.hasChanges()); + assertEquals(expectedSyncStatus.changes().isEmpty(), actualSyncStatus.changes().isEmpty()); + + // free all handles + sut.flowMetaData.closeRepository(); + } finally { + deleteGitRepository(gitDir); + } + } + + private void waitUntilPushHasBeenFinished() { + try { + Thread.sleep(20000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java index baeb94931..01079d977 100644 --- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java +++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java @@ -1352,4 +1352,35 @@ private Answer updateFlowAnswer() { return flowEntity; }; } + + // -----------------Test Buckets Sync Service Method--------------------- + + @Test + public void testSyncBucketsMetadata() { + when(flowPersistenceProvider.getFlowContent( + anyString(), anyString(), anyInt() + )).thenReturn(new byte[10], new byte[10]); + + final VersionedProcessGroup pgA = createVersionedProcessGroupA(); + final VersionedProcessGroup pgB = createVersionedProcessGroupB(); + when(snapshotSerializer.deserialize(any())).thenReturn(pgA, pgB); + + // getFlowDiff orders the changes in ascending order of version number regardless of param order + final VersionedFlowDifference diff = registryService.getFlowDiff( + "bucketIdentifier", "flowIdentifier", 2, 1); + + assertNotNull(diff); + Optional nameChangedComponent = diff.getComponentDifferenceGroups().stream() + .filter(p -> p.getComponentId().equals("ProcessorFirstV1")).findFirst(); + + assertTrue(nameChangedComponent.isPresent()); + + ComponentDifference nameChangeDifference = nameChangedComponent.get().getDifferences().stream() + .filter(d -> d.getDifferenceType().equals("NAME_CHANGED")).findFirst().get(); + + assertEquals("ProcessorFirstV1", nameChangeDifference.getValueA()); + assertEquals("ProcessorFirstV2", nameChangeDifference.getValueB()); + } + + // ------------------------------------------------------------------- } diff --git a/nifi-registry-core/nifi-registry-properties/src/test/groovy/org/apache/nifi/security/crypto/CryptoKeyLoaderGroovyTest.groovy b/nifi-registry-core/nifi-registry-properties/src/test/groovy/org/apache/nifi/security/crypto/CryptoKeyLoaderGroovyTest.groovy index 4f69682fe..385b35295 100644 --- a/nifi-registry-core/nifi-registry-properties/src/test/groovy/org/apache/nifi/security/crypto/CryptoKeyLoaderGroovyTest.groovy +++ b/nifi-registry-core/nifi-registry-properties/src/test/groovy/org/apache/nifi/security/crypto/CryptoKeyLoaderGroovyTest.groovy @@ -19,6 +19,7 @@ package org.apache.nifi.security.crypto import org.apache.nifi.registry.security.crypto.CryptoKeyLoader import org.apache.nifi.registry.security.crypto.CryptoKeyProvider import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.junit.Assume import org.junit.BeforeClass import org.junit.Test import org.junit.runner.RunWith @@ -97,6 +98,8 @@ class CryptoKeyLoaderGroovyTest extends GroovyTestCase { @Test public void testShouldNotExtractKeyFromUnreadableBootstrapFile() throws Exception { + //dirty hack to prevent execution of this test on windows systems because of incompatibility with posix permissions + Assume.assumeFalse(System.getProperty("os.name").toLowerCase().startsWith("win")) // Arrange File unreadableFile = new File("src/test/resources/conf/bootstrap.unreadable_file_permissions.conf") Set originalPermissions = Files.getPosixFilePermissions(unreadableFile.toPath()) diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java index 90c872ffa..d6b020146 100644 --- a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java +++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java @@ -17,6 +17,7 @@ package org.apache.nifi.registry.flow; import org.apache.nifi.registry.provider.Provider; +import org.apache.nifi.registry.provider.ProviderSynchronization; /** * A service that can store and retrieve flow contents. @@ -27,7 +28,7 @@ * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may * change across releases until the registry matures. */ -public interface FlowPersistenceProvider extends Provider { +public interface FlowPersistenceProvider extends Provider, ProviderSynchronization { /** * Persists the serialized content. diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderSynchronization.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderSynchronization.java new file mode 100644 index 000000000..3c4630c78 --- /dev/null +++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderSynchronization.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import org.apache.nifi.registry.provider.sync.RepositorySyncStatus; + +import java.io.IOException; + +public interface ProviderSynchronization { + /** + * define that this persistence provider is capable of making a synchronization + * + * @return true, if the instance can synchronize the repository otherwise false + */ + Boolean canBeSynchronized(); + /** + * synchronizes the repository with the remote repository (pulling changes) + */ + void getLatestChangesOfRemoteRepository() throws IOException; + + /** + * reset repository completely and re-synchronize with the remote repository + */ + void resetRepository() throws IOException; + + /** + * get the current status of the repository synchronization + * + * @return RepositorySyncStatus assembling the information about the status + * @throws IOException when the provider cannot execute the status command + */ + RepositorySyncStatus getStatus() throws IOException; +} diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/sync/RepositorySyncStatus.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/sync/RepositorySyncStatus.java new file mode 100644 index 000000000..891cd896d --- /dev/null +++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/sync/RepositorySyncStatus.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.sync; + +import java.util.ArrayList; +import java.util.Collection; + +public class RepositorySyncStatus { + private boolean isClean; + private boolean hasChanges; + private Collection changes; + + public RepositorySyncStatus(boolean isClean, boolean hasChanges, Collection changes) { + this.isClean = isClean; + this.hasChanges = hasChanges; + this.changes = changes; + } + + public static RepositorySyncStatus SuccessfulSynchronizedRepository(){ + return new RepositorySyncStatus(true, false, new ArrayList<>()); + } + + public boolean isClean() { + return isClean; + } + + public boolean hasChanges() { + return this.hasChanges; + } + + public Collection changes() { + return this.changes; + } +} diff --git a/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java b/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java index d033ef286..0f22c9dc6 100644 --- a/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java +++ b/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java @@ -21,12 +21,13 @@ import org.apache.nifi.registry.web.api.BucketBundleResource; import org.apache.nifi.registry.web.api.BucketFlowResource; import org.apache.nifi.registry.web.api.BucketResource; +import org.apache.nifi.registry.web.api.BundleResource; import org.apache.nifi.registry.web.api.ConfigResource; import org.apache.nifi.registry.web.api.ExtensionRepoResource; -import org.apache.nifi.registry.web.api.BundleResource; import org.apache.nifi.registry.web.api.ExtensionResource; import org.apache.nifi.registry.web.api.FlowResource; import org.apache.nifi.registry.web.api.ItemResource; +import org.apache.nifi.registry.web.api.SyncResource; import org.apache.nifi.registry.web.api.TenantResource; import org.glassfish.jersey.media.multipart.MultiPartFeature; import org.glassfish.jersey.server.ResourceConfig; @@ -73,6 +74,7 @@ public NiFiRegistryResourceConfig(@Context ServletContext servletContext) { // register multipart feature register(MultiPartFeature.class); + register(SyncResource.class); // include bean validation errors in response property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true); diff --git a/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/SyncResource.java b/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/SyncResource.java new file mode 100644 index 000000000..5a86d9121 --- /dev/null +++ b/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/SyncResource.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.web.api; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.Extension; +import io.swagger.annotations.ExtensionProperty; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.event.EventFactory; +import org.apache.nifi.registry.event.EventService; +import org.apache.nifi.registry.security.authorization.RequestAction; +import org.apache.nifi.registry.security.authorization.exception.AccessDeniedException; +import org.apache.nifi.registry.security.authorization.resource.Authorizable; +import org.apache.nifi.registry.service.AuthorizationService; +import org.apache.nifi.registry.service.RegistryService; +import org.apache.nifi.registry.sync.RepositorySyncStatus; +import org.apache.nifi.registry.web.link.LinkService; +import org.apache.nifi.registry.web.security.PermissionsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import java.io.IOException; +import java.util.Collection; + +@Component +@Path("/sync") +@Api( + value = "sync", + description = "Provides methods to sync bucket metadata with persistence providers", + authorizations = { @Authorization("Authorization") } +) +public class SyncResource extends AuthorizableApplicationResource { + + private static final Logger logger = LoggerFactory.getLogger(SyncResource.class); + + @Context + UriInfo uriInfo; + + private final LinkService linkService; + + private final RegistryService registryService; + + private final PermissionsService permissionsService; + + @Autowired + public SyncResource( + final RegistryService registryService, + final LinkService linkService, + final PermissionsService permissionsService, + final AuthorizationService authorizationService, + final EventService eventService) { + super(authorizationService, eventService); + this.registryService = registryService; + this.linkService = linkService; + this.permissionsService = permissionsService; + } + + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation( + value = "Get current sync status", + response = RepositorySyncStatus.class, + extensions = { + @Extension(name = "access-policy", properties = { + @ExtensionProperty(name = "action", value = "read"), + @ExtensionProperty(name = "resource", value = "/sync")}) + } + ) + @ApiResponses({ + @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400), + @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401), + @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403)}) + public Response getSyncStatus() throws IOException { + authorizeAccess(RequestAction.READ); + + RepositorySyncStatus currentStatus = this.registryService.getStatus(); + + return Response.status(Response.Status.OK).entity(currentStatus).build(); + } + + + @PUT + @Path("metadata") + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation( + value = "Replaces the registry metadata (buckets, etc.) with the data of the underlying (local) persistence provider", + response = Bucket.class, + extensions = { + @Extension(name = "access-policy", properties = { + @ExtensionProperty(name = "action", value = "write"), + @ExtensionProperty(name = "resource", value = "/sync") }) + } + ) + @ApiResponses({ + @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400), + @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401), + @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403) }) + public Response syncMetaDataWithProviderData() { + authorizeAccess(RequestAction.WRITE); + + Collection buckets = syncRegistryMetadata(); + + return Response.status(Response.Status.OK).entity(buckets).build(); + } + + @PUT + @Path("remote") + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation( + value = "Reset provider repository to an initial state and re-syncs metadata (same as /sync/metadata). " + + "What initial state means depends on the underlying provider, in case of a GitFlowPersistenceProvider" + + " an 'initial state' invokes a clone command to synchronize the remote repository with the local repository.", + response = Bucket.class, + extensions = { + @Extension(name = "access-policy", properties = { + @ExtensionProperty(name = "action", value = "write"), + @ExtensionProperty(name = "resource", value = "/sync") }) + } + ) + @ApiResponses({ + @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400), + @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401), + @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403) }) + public Response resetProviderRepository() throws IOException { + authorizeAccess(RequestAction.WRITE); + + registryService.resetProviderRepository(); + Collection buckets = syncRegistryMetadata(); + + return Response.status(Response.Status.OK).entity(buckets).build(); + } + + @POST + @Path("remote") + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation( + value = "Get latest changes of remote provider repository and rebuild nifi-registry metadata. Use this only after " + + "having initialized the persistence provider.", + response = Bucket.class, + extensions = { + @Extension(name = "access-policy", properties = { + @ExtensionProperty(name = "action", value = "write"), + @ExtensionProperty(name = "resource", value = "/sync") }) + } + ) + @ApiResponses({ + @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400), + @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401), + @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403) }) + public Response getLatestChangesOfRemoteRepository() throws IOException { + authorizeAccess(RequestAction.WRITE); + + registryService.getLatestChangesOfRemoteRepository(); + Collection buckets = syncRegistryMetadata(); + return Response.status(Response.Status.OK).entity(buckets).build(); + } + + private Collection syncRegistryMetadata() { + Collection buckets = registryService.syncBuckets(); + for (Bucket bucket : buckets) { + publish(EventFactory.bucketCreated(bucket)); + permissionsService.populateBucketPermissions(bucket); + linkService.populateLinks(bucket); + } + return buckets; + } + + private void authorizeAccess(RequestAction actionType) throws AccessDeniedException { + final Authorizable bucketsAuthorizable = authorizableLookup.getBucketsAuthorizable(); + authorizationService.authorize(bucketsAuthorizable, actionType); + } + +} diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/GitFlowPersistenceTestDataFactory.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/GitFlowPersistenceTestDataFactory.java new file mode 100644 index 000000000..743d1152b --- /dev/null +++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/GitFlowPersistenceTestDataFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import org.apache.nifi.registry.metadata.BucketMetadata; +import org.apache.nifi.registry.metadata.FlowMetadata; +import org.apache.nifi.registry.metadata.FlowSnapshotMetadata; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class GitFlowPersistenceTestDataFactory { + public static org.apache.nifi.registry.bucket.Bucket[] createExpectedBuckets(int amount) { + ArrayList buckets = new ArrayList<>(); + for (int i = 0; i < amount; i++) { + buckets.add(createExpectedBucket(i)); + } + org.apache.nifi.registry.bucket.Bucket[] returnArray = + new org.apache.nifi.registry.bucket.Bucket[buckets.size()]; + return buckets.toArray(returnArray); + } + + private static org.apache.nifi.registry.bucket.Bucket createExpectedBucket(int i) { + org.apache.nifi.registry.bucket.Bucket bucket = new org.apache.nifi.registry.bucket.Bucket(); + bucket.setIdentifier("bucket" + i); + bucket.setName("bucket" + i); + + return bucket; + } + + private static Collection createSampleGitBuckets() { + ArrayList buckets = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Bucket b = new Bucket("bucket" + i); + b.setBucketDirName("bucket" + i); + Flow flow = b.getFlowOrCreate(b.getBucketId() + "_flowpointer_" + i); + createSampleGitFlow(flow); + buckets.add(b); + } + return buckets; + } + + private static Flow createSampleGitFlow(Flow flow) { + Flow.FlowPointer pointer = new Flow.FlowPointer(flow.getFlowId() + 1); + Calendar calendar = Calendar.getInstance(); + calendar.set(2018, 12, 12, 4, 2); + pointer.setCreated(calendar.getTime().getTime()); + pointer.setAuthor("author"); + + flow.putVersion(1, pointer); + return flow; + } + + public static List createSampleFlowMetadata() { + + List bucketMetadata = new ArrayList<>(); + Collection gitBuckets = createSampleGitBuckets(); + for (org.apache.nifi.registry.provider.flow.git.Bucket existingGitBucket : gitBuckets) { + BucketMetadata bucket = new BucketMetadata(); + bucket.setName(existingGitBucket.getBucketDirName()); + bucket.setIdentifier(existingGitBucket.getBucketId()); + bucket.setDescription("synced with git repository"); + + List flows = new ArrayList<>(); + for (Flow flow : existingGitBucket.getFlows().values()) { + FlowMetadata flowMetadata = new FlowMetadata(); + flowMetadata.setIdentifier(flow.getFlowId()); + flowMetadata.setName("unknown"); + //TODO get flow description + flowMetadata.setDescription("lost by sync because unavailable atm"); + + List flowSnapshotMetadata = new ArrayList<>(); + for (Map.Entry version : flow.getVersions().entrySet()) { + Integer versionNumber = version.getKey(); + Flow.FlowPointer flowPointer = version.getValue(); + FlowSnapshotMetadata snapshotMetadata = new FlowSnapshotMetadata(); + + + snapshotMetadata.setVersion(versionNumber); + snapshotMetadata.setAuthor(flowPointer.getAuthor()); + snapshotMetadata.setComments(flowPointer.getComment()); + snapshotMetadata.setCreated(flowPointer.getCreated()); + + flowSnapshotMetadata.add(snapshotMetadata); + } + + flowMetadata.setFlowSnapshotMetadata(flowSnapshotMetadata); + flows.add(flowMetadata); + } + bucket.setFlowMetadata(flows); + bucketMetadata.add(bucket); + } + + return bucketMetadata; + } +} diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/SyncIT.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/SyncIT.java new file mode 100644 index 000000000..9308bcf2c --- /dev/null +++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/SyncIT.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.web.api; + + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.metadata.BucketMetadata; +import org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider; +import org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceTestDataFactory; +import org.apache.nifi.registry.provider.sync.RepositorySyncStatus; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Profile; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.jdbc.Sql; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ActiveProfiles("WithGitProvider") +public class SyncIT extends UnsecuredITBase { + //TODO thread safety with parallel test execution? + @Autowired + private GitFlowPersistenceProvider gitFlowPersistenceProviderMock; + + + @Configuration + @Profile({"WithGitProvider"}) + public static class MockFlowPersistenceProvider { + + @Primary + @Bean + public FlowPersistenceProvider getGitFlowPersistenceProvider() { + return mock(GitFlowPersistenceProvider.class); + } + } + + @Test + @Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = { + "classpath:db/clearDB.sql", + "classpath:db/BucketsIT.sql" + }) + public void testSyncDeletesExistingBucketsWhenGitRepositoryIsEmpty() { + when(gitFlowPersistenceProviderMock.canBeSynchronized()).thenReturn(true); + final Bucket[] buckets = client + .target(createURL("sync")) + .path("metadata") + .request() + .put(Entity.entity("", MediaType.WILDCARD_TYPE), Bucket[].class); + + verify(gitFlowPersistenceProviderMock).canBeSynchronized(); + assertNotNull(buckets); + assertEquals(0, buckets.length); + } + + + @Test + @Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = { + "classpath:db/clearDB.sql", + "classpath:db/BucketsIT.sql" + }) + public void testSyncBucketsWithInitializedGitRepository() { + List bucketMetadata = GitFlowPersistenceTestDataFactory.createSampleFlowMetadata(); + when(gitFlowPersistenceProviderMock.canBeSynchronized()).thenReturn(true); + when(gitFlowPersistenceProviderMock.getMetadata()).thenReturn(bucketMetadata); + + final Bucket[] buckets = client + .target(createURL("sync")) + .path("metadata") + .request() + .put(Entity.entity("", MediaType.WILDCARD_TYPE), Bucket[].class); + + assertNotNull(buckets); + assertEquals(10, buckets.length); + assertBuckets( + GitFlowPersistenceTestDataFactory.createExpectedBuckets(buckets.length), + buckets); + } + + + @Test + @Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = { + "classpath:db/clearDB.sql", + "classpath:db/BucketsIT.sql" + }) + public void testSyncBucketsByResettingGitRepository() throws IOException { + List bucketMetadata = GitFlowPersistenceTestDataFactory.createSampleFlowMetadata(); + when(gitFlowPersistenceProviderMock.getMetadata()).thenReturn(bucketMetadata); + when(gitFlowPersistenceProviderMock.canBeSynchronized()).thenReturn(true); + + final Bucket[] buckets = client + .target(createURL("sync")) + .path("remote") + .request() + .put(Entity.entity("", MediaType.WILDCARD_TYPE), Bucket[].class); + + verify(gitFlowPersistenceProviderMock).resetRepository(); + assertNotNull(buckets); + assertEquals(10, buckets.length); + assertBuckets( + GitFlowPersistenceTestDataFactory.createExpectedBuckets(buckets.length), + buckets); + } + + @Test + @Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = { + "classpath:db/clearDB.sql", + "classpath:db/BucketsIT.sql" + }) + public void testSyncBucketsByGettingLatestChangesRepository() throws IOException { + List bucketMetadata = GitFlowPersistenceTestDataFactory.createSampleFlowMetadata(); + when(gitFlowPersistenceProviderMock.getMetadata()).thenReturn(bucketMetadata); + when(gitFlowPersistenceProviderMock.canBeSynchronized()).thenReturn(true); + + final Bucket[] buckets = client + .target(createURL("sync")) + .path("remote") + .request() + .post(Entity.entity("", MediaType.WILDCARD_TYPE), Bucket[].class); + + verify(gitFlowPersistenceProviderMock).getLatestChangesOfRemoteRepository(); + assertNotNull(buckets); + assertEquals(10, buckets.length); + assertBuckets( + GitFlowPersistenceTestDataFactory.createExpectedBuckets(buckets.length), + buckets); + } + + @Test + public void testGetSyncStatus() throws IOException { + when(gitFlowPersistenceProviderMock.getStatus()).thenReturn(RepositorySyncStatus.SuccessfulSynchronizedRepository()); + + final org.apache.nifi.registry.sync.RepositorySyncStatus statusDto = client + .target(createURL("sync")) + .request() + .get(org.apache.nifi.registry.sync.RepositorySyncStatus.class); + + verify(gitFlowPersistenceProviderMock).getStatus(); + assertEquals(statusDto.getIsClean(), true); + assertEquals(statusDto.getHasChanges(), false); + assertEquals(statusDto.getChanges().isEmpty(), true); + } + + private void assertBuckets(Bucket[] expectedBuckets, Bucket[] actual) { + for (int i = 0; i < expectedBuckets.length; i++) { + assertEquals(expectedBuckets[i].getIdentifier(), actual[i].getIdentifier()); + assertEquals(expectedBuckets[i].getName(), actual[i].getName()); + } + } +} diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/SyncWithoutGitProviderIT.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/SyncWithoutGitProviderIT.java new file mode 100644 index 000000000..5c8f6c5b6 --- /dev/null +++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/SyncWithoutGitProviderIT.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.web.api; + +import org.apache.nifi.registry.bucket.Bucket; +import org.junit.Test; +import org.springframework.test.context.jdbc.Sql; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class SyncWithoutGitProviderIT extends UnsecuredITBase { + + @Test + @Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = { + "classpath:db/clearDB.sql", + "classpath:db/BucketsIT.sql" + }) + public void testSyncReturnsExistingBucketsWhenGitFlowRepositoryIsNotActive() { + final Bucket[] buckets = client + .target(createURL("sync")) + .path("metadata") + .request() + .put(Entity.entity("", MediaType.WILDCARD_TYPE), Bucket[].class); + + assertNotNull(buckets); + assertEquals(3, buckets.length); + for (int i = 0; i < buckets.length; i++) { + assertEquals(String.valueOf(i + 1), buckets[i].getIdentifier()); + } + } + + // TODO: return more expressive return code (not implemented?) + @Test(expected = InternalServerErrorException.class) + @Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = { + "classpath:db/clearDB.sql", + "classpath:db/BucketsIT.sql" + }) + public void testSyncBucketsByResettingGitRepositoryThrowsIOExceptionWhenSyncIsNotImplemented() { + client + .target(createURL("sync")) + .path("remote") + .request() + .put(Entity.entity("https://gitrepository.com/fancy", MediaType.WILDCARD_TYPE), Bucket[].class); + } + + @Test + public void testGetStatusReturnsValidSyncState() { + final org.apache.nifi.registry.sync.RepositorySyncStatus statusDto = client + .target(createURL("sync")) + .request() + .get(org.apache.nifi.registry.sync.RepositorySyncStatus.class); + + assertEquals(statusDto.getIsClean(), true); + assertEquals(statusDto.getHasChanges(), false); + assertEquals(statusDto.getChanges().isEmpty(), true); + } +}