Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -473,5 +473,5 @@ public List<String> getDefaultStorageIds(String region) {
*/
public ConnectorMapping DEFAULT_CONNECTOR_MAPPING_STRATEGY = RANDOM;

public boolean USE_WRITE_FEATURES_EVENT = false;
public boolean USE_WRITE_FEATURES_EVENT = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package com.here.xyz.hub.task;

import static com.here.xyz.hub.task.FeatureTaskHandler.injectMinVersion;
import static com.here.xyz.hub.task.FeatureTask.resolveBranchFor;
import static com.here.xyz.hub.task.FeatureTaskHandler.injectMinVersion;
import static com.here.xyz.util.service.rest.TooManyRequestsException.ThrottlingReason.MEMORY;
import static com.here.xyz.util.service.rest.TooManyRequestsException.ThrottlingReason.STORAGE_QUEUE_FULL;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_GATEWAY;
Expand Down Expand Up @@ -61,6 +61,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -95,7 +96,8 @@ public static Future<FeatureCollection> writeFeatures(Marker marker, Space space
.withContext(spaceContext)
.withAuthor(author)
.withResponseDataExpected(responseDataExpected)
.withRef(baseRef);
.withRef(baseRef)
.withSearchableProperties(toExtractableSearchProperties(space));

return Future.all(injectMinVersion(marker, space.getId(), event), injectSpaceParams(event, space))
.compose(v -> {
Expand Down Expand Up @@ -131,6 +133,38 @@ else if (ar.result() instanceof FeatureCollection featureCollection)
}
}

private static Map<String, String> toExtractableSearchProperties(Space space) {
Map<String, String> extractableSearchProperties = new HashMap<>();
for (Entry<String, Boolean> sp : space.getSearchableProperties().entrySet()) {
String searchableExpression = sp.getKey().contains("::") ? sp.getKey().substring(0, sp.getKey().indexOf("::")) : sp.getKey();

String alias, jsonPathExpression;
if (searchableExpression.startsWith("$") && searchableExpression.contains(":")) {
alias = searchableExpression.substring(1, searchableExpression.indexOf(":"));
jsonPathExpression = searchableExpression.startsWith("[")
? searchableExpression.substring(1, searchableExpression.length() - 1) //expression is already a JSONPath
: toJsonPath(searchableExpression.substring(searchableExpression.indexOf(":") + 1)); //expression is an "old" dot-notation
}
else {
alias = searchableExpression;
jsonPathExpression = toJsonPath(searchableExpression);
}

extractableSearchProperties.put(alias, jsonPathExpression);
}
return extractableSearchProperties;
}

/**
* Translates a dot-notation path to a JSONPath expression.
* @param dotNotation
* @return
*/
private static String toJsonPath(String dotNotation) {
//TODO: Translate the path pointer like "prop1.prop2[0].prop3" to JSONPath "$.prop1.prop2[0].prop3" for all cases correctly
return "$." + dotNotation;
}

static void throttle(Space space) throws HttpException {
Connector storage = space.getResolvedStorageConnector();
final long GLOBAL_INFLIGHT_REQUEST_MEMORY_SIZE = (long) Service.configuration.GLOBAL_INFLIGHT_REQUEST_MEMORY_SIZE_MB * 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017-2024 HERE Europe B.V.
* Copyright (C) 2017-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,11 +21,18 @@

import com.here.xyz.models.geojson.implementation.FeatureCollection;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class WriteFeaturesEvent extends ContextAwareEvent<WriteFeaturesEvent> {
private Set<Modification> modifications;
private boolean responseDataExpected;
/**
* A map of properties that are searchable.
* The key is the name of the alias / field that should be indexed.
* The value is the JSON-Pointer to the property in the feature or a JSONPath expression.
*/
private Map<String, String> searchableProperties;

public Set<Modification> getModifications() {
return modifications;
Expand Down Expand Up @@ -53,6 +60,19 @@ public WriteFeaturesEvent withResponseDataExpected(boolean responseDataExpected)
return this;
}

public Map<String, String> getSearchableProperties() {
return searchableProperties;
}

public void setSearchableProperties(Map<String, String> searchableProperties) {
this.searchableProperties = searchableProperties;
}

public WriteFeaturesEvent withSearchableProperties(Map<String, String> searchableProperties) {
setSearchableProperties(searchableProperties);
return this;
}

public static class Modification {
private UpdateStrategy updateStrategy;
private FeatureCollection featureData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.here.xyz.responses.XyzError.CONFLICT;
import static com.here.xyz.responses.XyzError.EXCEPTION;
import static com.here.xyz.responses.XyzError.NOT_FOUND;
import static com.here.xyz.util.db.ConnectorParameters.TableLayout.NEW_LAYOUT;
import static com.here.xyz.util.db.pg.SQLError.RETRYABLE_VERSION_CONFLICT;
import static com.here.xyz.util.db.pg.XyzSpaceTableHelper.PARTITION_SIZE;

Expand All @@ -41,7 +42,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -85,7 +88,7 @@ protected SQLQuery buildQuery(WriteFeaturesEvent event) throws ErrorResponseExce
.withSpaceContext(spaceContext)
.withHistoryEnabled(event.getVersionsToKeep() > 1)
.withBatchMode(true)
.with("tableLayout",getTableLayout())
.with("tableLayout", getTableLayout())
.with("debug", "true".equals(System.getenv("FW_DEBUG")))
.with("queryId", FunctionRuntime.getInstance().getStreamId())
.with("PARTITION_SIZE", PARTITION_SIZE)
Expand All @@ -96,6 +99,17 @@ protected SQLQuery buildQuery(WriteFeaturesEvent event) throws ErrorResponseExce
if (event.getRef() != null && event.getRef().isSingleVersion() && !event.getRef().isHead())
queryContextBuilder.withBaseVersion(event.getRef().getVersion());

if (getTableLayout() == NEW_LAYOUT) {
//Temporary workaround for NL connector
Map<String, String> searchableProperties = new HashMap<>(event.getSearchableProperties());
searchableProperties.put("refQuad", "$.properties.refQuad");
searchableProperties.put("globalVersion", "$.properties.globalVersion");
//End of workaround

if (searchableProperties != null && !searchableProperties.isEmpty())
queryContextBuilder.with("writeHooks", List.of(writeHook(searchableProperties)));
}

return new SQLQuery("SELECT write_features(#{modifications}, 'Modifications', #{author}, #{responseDataExpected});")
.withLoggingEnabled(false)
.withContext(queryContextBuilder.build())
Expand Down Expand Up @@ -134,4 +148,25 @@ public FeatureCollection handle(ResultSet rs) throws SQLException {
throw new RuntimeException("Error parsing query result.", e);
}
}

private String writeHook(Map<String, String> searchableProperties) {
return """
(feature, row) => {
const searchableProperties = ${searchableProperties};
let searchables = {};

for (const alias in searchableProperties) {
const jsonPath = searchableProperties[alias];
try {
searchables[alias] = jsonpath_rfc9535.query(feature, jsonPath)[0];
}
catch (err) {
throw new Error(`Error evaluating JSONPath for alias ${alias} and expression ${jsonPath} message: ${err.message}`);
}
}

row.searchable = !row.searchable ? searchables : {...row.searchable, ...searchables};
}
""".replace("${searchableProperties}", XyzSerializable.serialize(searchableProperties));
}
}
77 changes: 56 additions & 21 deletions xyz-util/src/main/resources/sql/DatabaseWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* License-Filename: LICENSE
*/

const DEFAULT_COLUMN_NAMES = ["id", "version", "operation", "author", "jsondata", "geo"];

class DatabaseWriter {

schema;
Expand Down Expand Up @@ -237,13 +239,26 @@ class DatabaseWriter {
insertHistoryRow(inputFeature, onVersionConflict, baseVersion, baseFeature, version, operation, author, resultHandler) {
//TODO: Improve performance by reading geo inside JS and then pass it separately and use TEXT / WKB / BYTEA
this.enrichTimestamps(inputFeature, operation == "I" || operation == "H", baseFeature);
let extraCols = '';
let extraVals = '';

if (this.tableLayout === 'NEW_LAYOUT') {
extraCols = ', searchable';
extraVals = ', $9::JSONB ';
const row = {
id: inputFeature.id,
version: version,
operation: operation,
author: author,
jsondata: inputFeature,
geo: inputFeature.geometry
};

this.patchRowData(inputFeature, row);

let extraCols = "";
let extraVals = "";
let extraColNames = this._extraColNames(row);
for (let i = 0; i < extraColNames.length; i++) {
extraCols += `, "${extraColNames[i]}"`;
extraVals += `, $${i + 9}`;
}
const allColNames = DEFAULT_COLUMN_NAMES.concat(extraColNames);

const sql = `UPDATE "${this.schema}"."${this.table}"
SET next_version = $2
Expand All @@ -269,9 +284,8 @@ class DatabaseWriter {
if (!this.plans[method]) {
const paramTypes = ["TEXT", "BIGINT", "CHAR", "TEXT", "JSONB", "JSONB", "BIGINT", "BIGINT"];

if (this.tableLayout === 'NEW_LAYOUT') {
paramTypes.push("JSONB");
}
if (extraColNames.length)
extraColNames.forEach(colName => paramTypes.push(this._sqlTypeOf(row[colName])));

this.plans[method] = this._preparePlan(sql, paramTypes);
this.parameterSets[method] = [];
Expand All @@ -285,23 +299,18 @@ class DatabaseWriter {
}

const params = [
inputFeature.id,
version,
operation,
author,
inputFeature,
inputFeature.geometry,
row.id,
row.version,
row.operation,
row.author,
row.jsondata,
row.geo,
MAX_BIG_INT,
onVersionConflict != null ? baseVersion : -1
];

if (this.tableLayout === 'NEW_LAYOUT') {
const searchable = {
refQuad: inputFeature.properties.refQuad,
globalVersion: inputFeature.properties.globalVersion
};
params.push(searchable);
}
if (extraColNames.length)
extraColNames.forEach(colName => params.push(row[colName]));

this.parameterSets[method].push(params);
this.resultParsers[method].push(result => {
Expand All @@ -314,6 +323,32 @@ class DatabaseWriter {
return this.execute()[0];
}

_extraColNames(row) {
let extraColNames = [];
for (let columnName in row)
if (!DEFAULT_COLUMN_NAMES.includes(columnName))
extraColNames.push(columnName);
return extraColNames;
}

_sqlTypeOf(value) {
if (typeof value === "string")
return "TEXT";
else if (typeof value === "number")
return Number.isInteger(value) ? "BIGINT" : "DOUBLE PRECISION";
else if (typeof value === "boolean")
return "BOOLEAN";
else if (value instanceof Object)
return "JSONB";
else
return "TEXT";
}

patchRowData(inputFeature, row) {
let writeHooks = (queryContext().writeHooks || []).map(hookFunctionCode => eval(hookFunctionCode));
writeHooks.forEach(writeHook => writeHook(inputFeature, row));
}

_PARTITION_SIZE() {
return queryContext().PARTITION_SIZE || 100000; //TODO: Ensure the partition size is always set in the query context
}
Expand Down
7 changes: 1 addition & 6 deletions xyz-util/src/main/resources/sql/FeatureWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,7 @@ class FeatureWriter {
}

_handleLoadedFeatureRow(resultSet) {
let feature;
if(queryContext().tableLayout === 'NEW_LAYOUT')
feature = JSON.parse(resultSet[0].jsondata);
else
feature = resultSet[0].jsondata;

let feature = resultSet[0] === "string" ? JSON.parse(resultSet[0].jsondata) : resultSet[0].jsondata;
feature.id = resultSet[0].id;
feature.geometry = resultSet[0].geo;
feature.properties[XYZ_NS].version = Number(resultSet[0].version);
Expand Down
19 changes: 0 additions & 19 deletions xyz-util/src/main/resources/sql/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,6 @@
* License-Filename: LICENSE
*/

/*
* Copyright (C) 2017-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

/**
* Install required extensions
*/
Expand Down
2 changes: 1 addition & 1 deletion xyz-util/src/main/resources/sql/feature_writer.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ $BODY$
};

// load required js libs
// plv8.execute("SELECT require( 'jsonpath_rfc9535' )");
plv8.execute("SELECT require( 'jsonpath_rfc9535' )");

//Init block of internal feature_writer functionality
${{FeatureWriter.js}}
Expand Down