diff --git a/common/src/main/java/apoc/ApocConfig.java b/common/src/main/java/apoc/ApocConfig.java index 3f738e89a..fb211cce1 100644 --- a/common/src/main/java/apoc/ApocConfig.java +++ b/common/src/main/java/apoc/ApocConfig.java @@ -18,7 +18,6 @@ */ package apoc; -import static apoc.util.FileUtils.isFile; import static java.lang.String.format; import static org.neo4j.configuration.BootloaderSettings.lib_directory; import static org.neo4j.configuration.BootloaderSettings.run_directory; @@ -35,6 +34,8 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.net.MalformedURLException; +import java.net.URI; import java.net.URL; import java.nio.file.Path; import java.time.Duration; @@ -44,6 +45,10 @@ import java.util.List; import java.util.Map; import java.util.stream.Stream; + +import apoc.util.FileUtils; +import apoc.util.SupportedProtocols; +import apoc.util.Util; import org.apache.commons.configuration2.CombinedConfiguration; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.EnvironmentConfiguration; @@ -336,6 +341,42 @@ public void checkWriteAllowed(ExportConfig exportConfig, String fileName) { } } } + + public static boolean isFile(String fileName) { + return from(fileName) == SupportedProtocols.file; + } + + + public static SupportedProtocols from(URL url) { + return FileUtils.of(url.getProtocol()); + } + + public static SupportedProtocols from(String source) { + try { + final URL url = new URL(source); + return from(url); + } catch (MalformedURLException e) { + if (!e.getMessage().contains("no protocol")) { + try { + // in case new URL(source) throw e.g. unknown protocol: hdfs, because of missing jar, + // we retrieve the related enum and throw the associated MissingDependencyException(..) + // otherwise we return unknown protocol: yyyyy + return SupportedProtocols.valueOf(new URI(source).getScheme()); + } catch (Exception ignored) { + } + + // in case a Windows user write an url like `C:/User/...` + if (e.getMessage().contains("unknown protocol") && Util.isWindows()) { + throw new RuntimeException(e.getMessage() + + "\n Please note that for Windows absolute paths they have to be explicit by prepending `file:` or supplied without the drive, " + + "\n e.g. `file:C:/my/path/file` or `/my/path/file`, instead of `C:/my/path/file`"); + } + throw new RuntimeException(e); + } + return SupportedProtocols.file; + } + } + public static ApocConfig apocConfig() { return theInstance; diff --git a/common/src/main/java/apoc/result/VirtualPath.java b/common/src/main/java/apoc/result/VirtualPath.java index e0ce52137..e682c1c1f 100644 --- a/common/src/main/java/apoc/result/VirtualPath.java +++ b/common/src/main/java/apoc/result/VirtualPath.java @@ -18,7 +18,6 @@ */ package apoc.result; -import apoc.util.CollectionUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -26,6 +25,8 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; + +import apoc.util.Util; import org.neo4j.graphdb.Entity; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Path; @@ -131,7 +132,7 @@ public String toString() { private void requireConnected(Relationship relationship) { final List previousNodes = getPreviousNodes(); - boolean isRelConnectedToPrevious = CollectionUtils.containsAny(previousNodes, relationship.getNodes()); + boolean isRelConnectedToPrevious = Util.containsAny(previousNodes, relationship.getNodes()); if (!isRelConnectedToPrevious) { throw new IllegalArgumentException("Relationship is not part of current path."); } diff --git a/common/src/main/java/apoc/util/FileUtils.java b/common/src/main/java/apoc/util/FileUtils.java index da1857a04..2669b791b 100644 --- a/common/src/main/java/apoc/util/FileUtils.java +++ b/common/src/main/java/apoc/util/FileUtils.java @@ -20,9 +20,10 @@ import static apoc.ApocConfig.APOC_IMPORT_FILE_ALLOW__READ__FROM__FILESYSTEM; import static apoc.ApocConfig.apocConfig; +import static apoc.export.util.LimitedSizeInputStream.toLimitedIStream; import static apoc.util.Util.ERROR_BYTES_OR_STRING; import static apoc.util.Util.REDIRECT_LIMIT; -import static apoc.util.Util.readHttpInputStream; +import static apoc.util.Util.isRedirect; import apoc.ApocConfig; import apoc.export.util.CountingInputStream; @@ -32,14 +33,21 @@ import apoc.util.s3.S3URLConnection; import apoc.util.s3.S3UploadUtils; import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.StringWriter; +import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.net.URLConnection; import java.net.URLStreamHandler; import java.net.URLStreamHandlerFactory; import java.nio.file.NoSuchFileException; @@ -47,7 +55,11 @@ import java.nio.file.Paths; import java.util.Map; import java.util.Optional; + +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.neo4j.graphdb.security.URLAccessChecker; import org.neo4j.graphdb.security.URLAccessValidationError; @@ -187,7 +199,7 @@ public static CountingInputStream inputStreamFor( if (input instanceof String) { String fileName = (String) input; fileName = changeFileUrlIfImportDirectoryConstrained(fileName, urlAccessChecker); - return Util.openInputStream(fileName, headers, payload, compressionAlgo, urlAccessChecker); + return FileUtils.openInputStream(fileName, headers, payload, compressionAlgo, urlAccessChecker); } else if (input instanceof byte[]) { return getInputStreamFromBinary((byte[]) input, compressionAlgo); } else { @@ -345,4 +357,142 @@ public static File getLogDirectory() { public static CountingInputStream getInputStreamFromBinary(byte[] urlOrBinary, String compressionAlgo) { return CompressionAlgo.valueOf(compressionAlgo).toInputStream(urlOrBinary); } + + public static StreamConnection readHttpInputStream( + String urlAddress, + Map headers, + String payload, + int redirectLimit, + URLAccessChecker urlAccessChecker) + throws IOException { + URL url = ApocConfig.apocConfig().checkAllowedUrlAndPinToIP(urlAddress, urlAccessChecker); + URLConnection con = openUrlConnection(url, headers); + writePayload(con, payload); + String newUrl = handleRedirect(con, urlAddress); + if (newUrl != null && !urlAddress.equals(newUrl)) { + con.getInputStream().close(); + if (redirectLimit == 0) { + throw new IOException("Redirect limit exceeded"); + } + return readHttpInputStream(newUrl, headers, payload, --redirectLimit, urlAccessChecker); + } + + return new StreamConnection.UrlStreamConnection(con); + } + + public static URLConnection openUrlConnection(URL src, Map headers) throws IOException { + URLConnection con = src.openConnection(); + con.setRequestProperty("User-Agent", "APOC Procedures for Neo4j"); + if (con instanceof HttpURLConnection) { + HttpURLConnection http = (HttpURLConnection) con; + http.setInstanceFollowRedirects(false); + if (headers != null) { + Object method = headers.get("method"); + if (method != null) { + http.setRequestMethod(method.toString()); + http.setChunkedStreamingMode(1024 * 1024); + } + headers.forEach((k, v) -> con.setRequestProperty(k, v == null ? "" : v.toString())); + } + } + + con.setConnectTimeout(apocConfig().getInt("apoc.http.timeout.connect", 10_000)); + con.setReadTimeout(apocConfig().getInt("apoc.http.timeout.read", 60_000)); + return con; + } + + private static void writePayload(URLConnection con, String payload) throws IOException { + if (payload == null) return; + con.setDoOutput(true); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(con.getOutputStream(), "UTF-8")); + writer.write(payload); + writer.close(); + } + + private static String handleRedirect(URLConnection con, String url) throws IOException { + if (!(con instanceof HttpURLConnection)) return url; + if (!isRedirect(((HttpURLConnection) con))) return url; + return con.getHeaderField("Location"); + } + + public static CountingInputStream openInputStream( + Object input, + Map headers, + String payload, + String compressionAlgo, + URLAccessChecker urlAccessChecker) + throws IOException, URISyntaxException, URLAccessValidationError { + if (input instanceof String) { + String urlAddress = (String) input; + final ArchiveType archiveType = ArchiveType.from(urlAddress); + if (archiveType.isArchive()) { + return getStreamCompressedFile(urlAddress, headers, payload, archiveType, urlAccessChecker); + } + + StreamConnection sc = getStreamConnection(urlAddress, headers, payload, urlAccessChecker); + return sc.toCountingInputStream(compressionAlgo); + } else if (input instanceof byte[]) { + return FileUtils.getInputStreamFromBinary((byte[]) input, compressionAlgo); + } else { + throw new RuntimeException(ERROR_BYTES_OR_STRING); + } + } + + private static CountingInputStream getStreamCompressedFile( + String urlAddress, + Map headers, + String payload, + ArchiveType archiveType, + URLAccessChecker urlAccessChecker) + throws IOException, URISyntaxException, URLAccessValidationError { + StreamConnection sc; + InputStream stream; + String[] tokens = urlAddress.split("!"); + urlAddress = tokens[0]; + String zipFileName; + if (tokens.length == 2) { + zipFileName = tokens[1]; + sc = getStreamConnection(urlAddress, headers, payload, urlAccessChecker); + stream = getFileStreamIntoCompressedFile(sc.getInputStream(), zipFileName, archiveType); + stream = toLimitedIStream(stream, sc.getLength()); + } else throw new IllegalArgumentException("filename can't be null or empty"); + + return new CountingInputStream(stream, sc.getLength()); + } + + public static StreamConnection getStreamConnection( + String urlAddress, Map headers, String payload, URLAccessChecker urlAccessChecker) + throws IOException, URISyntaxException, URLAccessValidationError { + return FileUtils.getStreamConnection( + FileUtils.from(urlAddress), urlAddress, headers, payload, urlAccessChecker); + } + + private static InputStream getFileStreamIntoCompressedFile(InputStream is, String fileName, ArchiveType archiveType) + throws IOException { + try (ArchiveInputStream archive = archiveType.getInputStream(is)) { + ArchiveEntry archiveEntry; + + while ((archiveEntry = archive.getNextEntry()) != null) { + if (!archiveEntry.isDirectory() && archiveEntry.getName().equals(fileName)) { + return new ByteArrayInputStream(IOUtils.toByteArray(archive)); + } + } + } + + return null; + } + + public static Object getStringOrCompressedData(StringWriter writer, ExportConfig config) { + try { + final String compression = config.getCompressionAlgo(); + final String writerString = writer.toString(); + Object data = compression.equals(CompressionAlgo.NONE.name()) + ? writerString + : CompressionAlgo.valueOf(compression).compress(writerString, config.getCharset()); + writer.getBuffer().setLength(0); + return data; + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/common/src/main/java/apoc/util/Util.java b/common/src/main/java/apoc/util/Util.java index ffd8538dd..7a40918a3 100644 --- a/common/src/main/java/apoc/util/Util.java +++ b/common/src/main/java/apoc/util/Util.java @@ -19,34 +19,24 @@ package apoc.util; import static apoc.ApocConfig.apocConfig; -import static apoc.export.util.LimitedSizeInputStream.toLimitedIStream; import static apoc.util.DateFormatUtil.getOrCreate; import static java.net.HttpURLConnection.HTTP_NOT_MODIFIED; import static org.eclipse.jetty.util.URIUtil.encodePath; import static org.neo4j.configuration.GraphDatabaseSettings.SYSTEM_DATABASE_NAME; -import apoc.ApocConfig; import apoc.Pools; import apoc.convert.ConvertUtils; -import apoc.export.util.CountingInputStream; -import apoc.export.util.ExportConfig; import apoc.result.VirtualNode; import apoc.result.VirtualRelationship; import apoc.util.collection.Iterators; -import java.io.BufferedWriter; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStreamWriter; -import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.lang.reflect.InvocationTargetException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; -import java.net.URLConnection; import java.net.URLEncoder; import java.time.format.DateTimeFormatter; import java.util.AbstractMap; @@ -84,9 +74,6 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.lang.model.SourceVersion; -import org.apache.commons.compress.archivers.ArchiveEntry; -import org.apache.commons.compress.archivers.ArchiveInputStream; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.neo4j.configuration.Config; @@ -108,8 +95,6 @@ import org.neo4j.graphdb.schema.ConstraintType; import org.neo4j.graphdb.schema.IndexDefinition; import org.neo4j.graphdb.schema.IndexType; -import org.neo4j.graphdb.security.URLAccessChecker; -import org.neo4j.graphdb.security.URLAccessValidationError; import org.neo4j.internal.schema.ConstraintDescriptor; import org.neo4j.kernel.impl.coreapi.InternalTransaction; import org.neo4j.kernel.impl.util.ValueUtils; @@ -337,27 +322,6 @@ public static Integer toInteger(Object value) { } } - public static URLConnection openUrlConnection(URL src, Map headers) throws IOException { - URLConnection con = src.openConnection(); - con.setRequestProperty("User-Agent", "APOC Procedures for Neo4j"); - if (con instanceof HttpURLConnection) { - HttpURLConnection http = (HttpURLConnection) con; - http.setInstanceFollowRedirects(false); - if (headers != null) { - Object method = headers.get("method"); - if (method != null) { - http.setRequestMethod(method.toString()); - http.setChunkedStreamingMode(1024 * 1024); - } - headers.forEach((k, v) -> con.setRequestProperty(k, v == null ? "" : v.toString())); - } - } - - con.setConnectTimeout(apocConfig().getInt("apoc.http.timeout.connect", 10_000)); - con.setReadTimeout(apocConfig().getInt("apoc.http.timeout.read", 60_000)); - return con; - } - public static boolean isRedirect(HttpURLConnection con) throws IOException { int responseCode = con.getResponseCode(); boolean isRedirectCode = @@ -374,109 +338,6 @@ public static boolean isRedirect(HttpURLConnection con) throws IOException { return isRedirectCode; } - private static void writePayload(URLConnection con, String payload) throws IOException { - if (payload == null) return; - con.setDoOutput(true); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(con.getOutputStream(), "UTF-8")); - writer.write(payload); - writer.close(); - } - - private static String handleRedirect(URLConnection con, String url) throws IOException { - if (!(con instanceof HttpURLConnection)) return url; - if (!isRedirect(((HttpURLConnection) con))) return url; - return con.getHeaderField("Location"); - } - - public static CountingInputStream openInputStream( - Object input, - Map headers, - String payload, - String compressionAlgo, - URLAccessChecker urlAccessChecker) - throws IOException, URISyntaxException, URLAccessValidationError { - if (input instanceof String) { - String urlAddress = (String) input; - final ArchiveType archiveType = ArchiveType.from(urlAddress); - if (archiveType.isArchive()) { - return getStreamCompressedFile(urlAddress, headers, payload, archiveType, urlAccessChecker); - } - - StreamConnection sc = getStreamConnection(urlAddress, headers, payload, urlAccessChecker); - return sc.toCountingInputStream(compressionAlgo); - } else if (input instanceof byte[]) { - return FileUtils.getInputStreamFromBinary((byte[]) input, compressionAlgo); - } else { - throw new RuntimeException(ERROR_BYTES_OR_STRING); - } - } - - private static CountingInputStream getStreamCompressedFile( - String urlAddress, - Map headers, - String payload, - ArchiveType archiveType, - URLAccessChecker urlAccessChecker) - throws IOException, URISyntaxException, URLAccessValidationError { - StreamConnection sc; - InputStream stream; - String[] tokens = urlAddress.split("!"); - urlAddress = tokens[0]; - String zipFileName; - if (tokens.length == 2) { - zipFileName = tokens[1]; - sc = getStreamConnection(urlAddress, headers, payload, urlAccessChecker); - stream = getFileStreamIntoCompressedFile(sc.getInputStream(), zipFileName, archiveType); - stream = toLimitedIStream(stream, sc.getLength()); - } else throw new IllegalArgumentException("filename can't be null or empty"); - - return new CountingInputStream(stream, sc.getLength()); - } - - public static StreamConnection getStreamConnection( - String urlAddress, Map headers, String payload, URLAccessChecker urlAccessChecker) - throws IOException, URISyntaxException, URLAccessValidationError { - return FileUtils.getStreamConnection( - FileUtils.from(urlAddress), urlAddress, headers, payload, urlAccessChecker); - } - - private static InputStream getFileStreamIntoCompressedFile(InputStream is, String fileName, ArchiveType archiveType) - throws IOException { - try (ArchiveInputStream archive = archiveType.getInputStream(is)) { - ArchiveEntry archiveEntry; - - while ((archiveEntry = archive.getNextEntry()) != null) { - if (!archiveEntry.isDirectory() && archiveEntry.getName().equals(fileName)) { - return new ByteArrayInputStream(IOUtils.toByteArray(archive)); - } - } - } - - return null; - } - - public static StreamConnection readHttpInputStream( - String urlAddress, - Map headers, - String payload, - int redirectLimit, - URLAccessChecker urlAccessChecker) - throws IOException { - URL url = ApocConfig.apocConfig().checkAllowedUrlAndPinToIP(urlAddress, urlAccessChecker); - URLConnection con = openUrlConnection(url, headers); - writePayload(con, payload); - String newUrl = handleRedirect(con, urlAddress); - if (newUrl != null && !urlAddress.equals(newUrl)) { - con.getInputStream().close(); - if (redirectLimit == 0) { - throw new IOException("Redirect limit exceeded"); - } - return readHttpInputStream(newUrl, headers, payload, --redirectLimit, urlAccessChecker); - } - - return new StreamConnection.UrlStreamConnection(con); - } - public static boolean toBoolean(Object value) { if ((value == null || value instanceof Number && (((Number) value).longValue()) == 0L @@ -1158,20 +1019,6 @@ private static Object getOrDefault(Map firstMap, Map T withTransactionAndRebind( GraphDatabaseService db, Transaction transaction, Function action) { T result = retryInTx(NullLog.getInstance(), db, action, 0, 0, r -> {}); @@ -1361,4 +1208,51 @@ public static T withBackOffRetries(Supplier func, long initialTimeout, lo } return result; } + + /** + * Null-safe check if the specified collection is empty. + * + * @param coll the collection to check, may be null + * @return true if empty or null + */ + public static boolean isEmpty(Collection coll) { + return coll == null || coll.isEmpty(); + } + + /** + * Null-safe check if the specified collection is not empty. + * + * @param coll the collection to check, may be null + * @return true if non-null and non-empty + */ + public static boolean isNotEmpty(Collection coll) { + return !isEmpty(coll); + } + + /** + * Returns true iff at least one element is in both collections. + * + * @param the type of object to lookup in coll1. + * @param coll1 the first collection, must not be null + * @param coll2 the second collection, must not be null + * @return true iff the intersection of the collections is non-empty + */ + public static boolean containsAny(Collection coll1, T... coll2) { + if (coll1.size() < coll2.length) { + for (Object aColl1 : coll1) { + for (T t : coll2) { + if (t.equals(aColl1)) { + return true; + } + } + } + } else { + for (Object aColl2 : coll2) { + if (coll1.contains(aColl2)) { + return true; + } + } + } + return false; + } } diff --git a/common/src/main/java/org/neo4j/cypher/export/SubGraph.java b/common/src/main/java/org/neo4j/cypher/export/SubGraph.java index d4f8182f4..7dd4ade27 100644 --- a/common/src/main/java/org/neo4j/cypher/export/SubGraph.java +++ b/common/src/main/java/org/neo4j/cypher/export/SubGraph.java @@ -20,7 +20,6 @@ import static java.util.stream.Collectors.toMap; -import apoc.util.CollectionUtils; import apoc.util.collection.Iterables; import java.util.Collection; import java.util.Iterator; @@ -59,7 +58,7 @@ public interface SubGraph { default Map relTypesInUse(TokenRead ops, Collection relTypeNames) { Stream stream = Iterables.stream(this.getAllRelationshipTypesInUse()); - if (CollectionUtils.isNotEmpty(relTypeNames)) { + if (isNotEmpty(relTypeNames)) { stream = stream.filter(rel -> relTypeNames.contains(rel.name())); } return stream.map(RelationshipType::name).collect(toMap(t -> t, ops::relationshipType)); @@ -67,7 +66,7 @@ default Map relTypesInUse(TokenRead ops, Collection rel default Map labelsInUse(TokenRead ops, Collection labelNames) { Stream