diff --git a/data/replication/state.txt b/data/replication/state.txt
new file mode 100644
index 000000000..ea173d7ae
--- /dev/null
+++ b/data/replication/state.txt
@@ -0,0 +1,3 @@
+#Wed Mar 15 18:02:09 UTC 2023
+sequenceNumber=92075
+timestamp=2023-03-15T18\:00\:00Z
diff --git a/data/sample.pbf b/data/sample.pbf
new file mode 100644
index 000000000..8a22edfee
Binary files /dev/null and b/data/sample.pbf differ
diff --git a/data/test-data.mv.db b/data/test-data.mv.db
index 523a3d454..040efe99b 100644
Binary files a/data/test-data.mv.db and b/data/test-data.mv.db differ
diff --git a/oshdb-ignite/pom.xml b/oshdb-ignite/pom.xml
new file mode 100644
index 000000000..62c164d81
--- /dev/null
+++ b/oshdb-ignite/pom.xml
@@ -0,0 +1,76 @@
+
+
+ 4.0.0
+
+ org.heigit.ohsome
+ oshdb-parent
+ 1.2.0-SNAPSHOT
+
+
+ oshdb-ignite
+
+
+ 11
+ 11
+ UTF-8
+
+
+
+
+
+ io.projectreactor
+ reactor-bom
+ 2022.0.3
+ pom
+ import
+
+
+
+
+
+
+
+
+ ${project.groupId}
+ oshdb-api-ignite
+ ${project.version}
+
+
+
+ me.tongfei
+ progressbar
+ 0.9.5
+
+
+
+ io.projectreactor
+ reactor-core
+
+
+
+
+ com.zaxxer
+ HikariCP
+ 5.0.1
+
+
+
+
+ org.duckdb
+ duckdb_jdbc
+ 0.7.1
+
+
+
+
+ org.apache.arrow
+ arrow-vector
+ 11.0.0
+
+
+
+
+
+
\ No newline at end of file
diff --git a/oshdb-ignite/src/main/java/org/heigit/ohsome/oshdb/ignite/progress/ProgressUtil.java b/oshdb-ignite/src/main/java/org/heigit/ohsome/oshdb/ignite/progress/ProgressUtil.java
new file mode 100644
index 000000000..978407353
--- /dev/null
+++ b/oshdb-ignite/src/main/java/org/heigit/ohsome/oshdb/ignite/progress/ProgressUtil.java
@@ -0,0 +1,20 @@
+package org.heigit.ohsome.oshdb.ignite.progress;
+
+import java.time.Duration;
+import me.tongfei.progressbar.ProgressBar;
+import me.tongfei.progressbar.ProgressBarBuilder;
+
+public class ProgressUtil {
+
+ private ProgressUtil(){
+ // utility class
+ }
+
+ public static ProgressBar progressBar(String name, int size, Duration updateInterval) {
+ return new ProgressBarBuilder().setTaskName(name)
+ .setUpdateIntervalMillis((int)updateInterval.toMillis())
+ .setInitialMax(size)
+ .setConsumer(new SysOutProgressConsumer(60))
+ .build();
+ }
+}
diff --git a/oshdb-ignite/src/main/java/org/heigit/ohsome/oshdb/ignite/progress/SysOutProgressConsumer.java b/oshdb-ignite/src/main/java/org/heigit/ohsome/oshdb/ignite/progress/SysOutProgressConsumer.java
new file mode 100644
index 000000000..51d1c7aea
--- /dev/null
+++ b/oshdb-ignite/src/main/java/org/heigit/ohsome/oshdb/ignite/progress/SysOutProgressConsumer.java
@@ -0,0 +1,26 @@
+package org.heigit.ohsome.oshdb.ignite.progress;
+
+import me.tongfei.progressbar.ProgressBarConsumer;
+
+public class SysOutProgressConsumer implements ProgressBarConsumer {
+ private final int maxLength;
+
+ public SysOutProgressConsumer(int maxLength) {
+ this.maxLength = maxLength;
+ }
+
+ @Override
+ public int getMaxRenderedLength() {
+ return maxLength;
+ }
+
+ @Override
+ public void accept(String rendered) {
+ System.out.println(rendered);
+ }
+
+ @Override
+ public void close() {
+ // no/op
+ }
+}
diff --git a/oshdb-rocksdb/pom.xml b/oshdb-rocksdb/pom.xml
new file mode 100644
index 000000000..19b9ca70e
--- /dev/null
+++ b/oshdb-rocksdb/pom.xml
@@ -0,0 +1,42 @@
+
+
+ 4.0.0
+
+ org.heigit.ohsome
+ oshdb-parent
+ 1.2.0-SNAPSHOT
+
+
+ oshdb-rocksdb
+ OSHDB RocksDB Module
+
+
+ 7.9.2
+
+
+
+
+
+ ${project.groupId}
+ oshdb-store
+ ${project.version}
+
+
+
+ org.rocksdb
+ rocksdbjni
+ ${rocksdb.version}
+
+
+
+ com.h2database
+ h2
+ ${h2.version}
+ test
+
+
+
+
+
\ No newline at end of file
diff --git a/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/BackRefStore.java b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/BackRefStore.java
new file mode 100644
index 000000000..ed4e0e330
--- /dev/null
+++ b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/BackRefStore.java
@@ -0,0 +1,107 @@
+package org.heigit.ohsome.oshdb.rocksdb;
+
+import static com.google.common.collect.Streams.zip;
+import static java.util.Collections.emptySet;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.idToKey;
+import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.idsToKeys;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Streams;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.heigit.ohsome.oshdb.store.BackRef;
+import org.heigit.ohsome.oshdb.store.BackRefType;
+import org.rocksdb.Cache;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+public class BackRefStore implements AutoCloseable {
+
+ private final BackRefType type;
+ private final Options dbOptions;
+ private final RocksDB db;
+
+ public BackRefStore(BackRefType type, Path path, Cache cache) throws IOException, RocksDBException {
+ Files.createDirectories(path);
+ this.type = type;
+
+ this.dbOptions = RocksDBUtil.defaultOptions();
+ dbOptions.setMergeOperator(new StringAppendOperator((char) 0));
+ dbOptions.unorderedWrite();
+
+ try {
+ db = RocksDB.open(dbOptions, path.toString());
+ } catch (RocksDBException e) {
+ close();
+ throw e;
+ }
+ }
+
+ public Map> backRefs(Set ids) throws RocksDBException {
+ var keys = idsToKeys(ids, ids.size());
+ try (var ro = new ReadOptions()) {
+ var backRefIfs = db.multiGetAsList(keys);
+ var map = Maps.>newHashMapWithExpectedSize(ids.size());
+ zip(ids.stream(), backRefIfs.stream(), (id, backRef) -> entry(id, keysToSet(backRef)))
+ .forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+ return map;
+ }
+ }
+
+ private Set keysToSet(byte[] backRefIds) {
+ if (backRefIds == null){
+ return emptySet();
+ }
+ var bb = ByteBuffer.wrap(backRefIds);
+ var set = new TreeSet();
+ set.add(bb.getLong());
+ while (bb.hasRemaining()) {
+ bb.get(); // delimiter;
+ set.add(bb.getLong());
+ }
+ return set;
+ }
+
+ public void update(List backRefs) throws RocksDBException {
+ throw new UnsupportedOperationException("not yet implemented");
+ }
+
+ @Override
+ public void close() {
+ ofNullable(db).ifPresent(RocksDB::close);
+ dbOptions.close();
+ }
+
+ @Override
+ public String toString() {
+ return "BackRefStore " + type;
+ }
+
+ public void merge(long backRef, Set ids) throws RocksDBException {
+ var backRefKey = idToKey(backRef);
+ try ( var wo = new WriteOptions().setDisableWAL(true);
+ var wb = new WriteBatch()) {
+ for (var id : ids) {
+ var key = idToKey(id);
+ wb.merge(key, backRefKey);
+ }
+ db.write(wo, wb);
+ }
+ }
+}
diff --git a/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/EntityStore.java b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/EntityStore.java
new file mode 100644
index 000000000..fda958c69
--- /dev/null
+++ b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/EntityStore.java
@@ -0,0 +1,237 @@
+package org.heigit.ohsome.oshdb.rocksdb;
+
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Streams.zip;
+import static java.nio.ByteBuffer.allocate;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Arrays.fill;
+import static java.util.Collections.emptyMap;
+import static java.util.Optional.ofNullable;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.cfOptions;
+import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.idToKey;
+import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.idsToKeys;
+import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.setCommonDBOption;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReentrantLock;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.store.OSHData;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityStore implements AutoCloseable {
+
+ private static final Logger log = LoggerFactory.getLogger(EntityStore.class);
+
+ private static final byte[] GRID_ENTITY_COLUMN_FAMILY = DEFAULT_COLUMN_FAMILY;
+ private static final byte[] IDX_ENTITY_GRID_COLUMN_FAMILY = "idx_entity_grid".getBytes(UTF_8);
+ private static final byte[] DIRTY_GRIDS_COLUMN_FAMILY = "dirty_grids".getBytes(UTF_8);
+
+ private static final byte[] EMPTY = new byte[0];
+ private static final byte[] KEY_ZERO = idToKey(0);
+
+ private final OSMType type;
+ private final Map cfOptions;
+ private final DBOptions dbOptions;
+ private final List cfHandles;
+ private final RocksDB db;
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ public EntityStore(OSMType type, Path path, Cache cache) throws RocksDBException, IOException {
+ Files.createDirectories(path);
+ this.type = type;
+ this.dbOptions = new DBOptions();
+ setCommonDBOption(dbOptions);
+
+ cfOptions = Map.of(
+ GRID_ENTITY_COLUMN_FAMILY, cfOptions(cache),
+ IDX_ENTITY_GRID_COLUMN_FAMILY, cfOptions(cache, tableConfig ->
+ tableConfig.setFilterPolicy(new BloomFilter(10))),
+ DIRTY_GRIDS_COLUMN_FAMILY, cfOptions(cache));
+
+ var cfDescriptors = cfOptions.entrySet().stream()
+ .map(option -> new ColumnFamilyDescriptor(option.getKey(), option.getValue()))
+ .toList();
+ this.cfHandles = new ArrayList<>();
+ try {
+ db = RocksDB.open(dbOptions, path.toString(), cfDescriptors, cfHandles);
+ } catch (RocksDBException e) {
+ close();
+ throw e;
+ }
+ }
+
+ public Map entities(Collection ids) throws RocksDBException {
+ var cfsList = new ColumnFamilyHandle[ids.size()];
+
+ fill(cfsList, entityGridCFHandle());
+ var keys = idsToKeys(ids, ids.size());
+ var gridIds = db.multiGetAsList(asList(cfsList), keys);
+
+ @SuppressWarnings("UnstableApiUsage")
+ var gridEntityKeys = zip(gridIds.stream(), keys.stream(), this::gridEntityKey)
+ .filter(key -> key.length != 0)
+ .toList();
+
+ if (gridEntityKeys.isEmpty()) {
+ return emptyMap();
+ }
+
+ fill(cfsList, gridEntityDataCFHandle());
+ var data = db.multiGetAsList(asList(cfsList), gridEntityKeys);
+
+ @SuppressWarnings("UnstableApiUsage")
+ var entities = zip(gridEntityKeys.stream(), data.stream(), this::gridEntityToOSHData)
+ .filter(Objects::nonNull)
+ .collect(toMap(OSHData::getId, identity()));
+ return entities;
+ }
+
+ public void update(List entities) throws RocksDBException {
+ lock.lock();
+ try (
+ var writeBatch = new WriteBatch();
+ var writeOptions = new WriteOptions()) {
+
+ var cfsList = new ColumnFamilyHandle[entities.size()];
+ fill(cfsList, entityGridCFHandle());
+ var keys = idsToKeys(transform(entities, OSHData::getId), entities.size());
+ var gridKeys = db.multiGetAsList(asList(cfsList), keys);
+
+ var idx = 0;
+ for (var entity : entities) {
+ var gridKey = idToKey(entity.getGridId());
+ var key = keys.get(idx);
+ var prevGridKey = gridKeys.get(idx);
+
+ if (prevGridKey != null && !Arrays.equals(prevGridKey, gridKey)) {
+ writeBatch.put(dirtyGridCFHandle(), prevGridKey, EMPTY);
+ writeBatch.delete(gridEntityDataCFHandle(), gridEntityKey(prevGridKey, key));
+ }
+
+ writeBatch.put(dirtyGridCFHandle(), gridKey, EMPTY);
+ writeBatch.put(entityGridCFHandle(), key, gridKey);
+ var gridEntityKey = gridEntityKey(gridKey, key);
+ writeBatch.put(gridEntityDataCFHandle(), gridEntityKey, entity.getData());
+ idx++;
+ }
+ db.write(writeOptions, writeBatch);
+ } finally {
+ lock.lock();
+ }
+ }
+
+ public List grid(long gridId) throws RocksDBException {
+ var gridKey = idToKey(gridId);
+ var gridEntityKey = gridEntityKey(gridKey, KEY_ZERO);
+ var nextGridEntityKey = gridEntityKey(idToKey(gridId + 1), KEY_ZERO);
+ try (var opts = new ReadOptions().setIterateUpperBound(new Slice(nextGridEntityKey));
+ var itr = db.newIterator(gridEntityDataCFHandle(), opts)) {
+ var list = new ArrayList();
+ itr.seek(gridEntityKey);
+ for (; itr.isValid(); itr.next()) {
+ var key = itr.key();
+ var data = itr.value();
+ var entityId = ByteBuffer.wrap(key, 8, 8).getLong();
+ var oshData = new OSHData(type, entityId, gridId, data);
+ list.add(oshData);
+ }
+ itr.status();
+ return list;
+ }
+ }
+
+ public Collection dirtyGrids() throws RocksDBException {
+ var cellIds = new ArrayList();
+ try (var itr = db.newIterator(dirtyGridCFHandle())) {
+ itr.seekToFirst();
+ for (; itr.isValid(); itr.next()) {
+ var gridId = ByteBuffer.wrap(itr.key()).getLong();
+ cellIds.add(gridId);
+ }
+ itr.status();
+ return cellIds;
+ }
+ }
+
+ public void resetDirtyGrids() throws RocksDBException {
+ log.debug("reset dirty grids {}", type);
+ db.dropColumnFamily(dirtyGridCFHandle());
+ var cfHandle = db.createColumnFamily(
+ new ColumnFamilyDescriptor(DIRTY_GRIDS_COLUMN_FAMILY,
+ cfOptions.get(DIRTY_GRIDS_COLUMN_FAMILY)));
+ dirtyGridCFHandle(cfHandle);
+ }
+
+ private ColumnFamilyHandle gridEntityDataCFHandle() {
+ return cfHandles.get(0);
+ }
+
+ private ColumnFamilyHandle entityGridCFHandle() {
+ return cfHandles.get(1);
+ }
+
+ private ColumnFamilyHandle dirtyGridCFHandle() {
+ return cfHandles.get(2);
+ }
+
+ private void dirtyGridCFHandle(ColumnFamilyHandle cfHandle) {
+ cfHandles.set(2, cfHandle);
+ }
+
+ private byte[] gridEntityKey(byte[] gridId, byte[] entityId) {
+ if (gridId == null) {
+ return EMPTY;
+ }
+ return allocate(Long.BYTES * 2).put(gridId).put(entityId).array();
+ }
+
+ private OSHData gridEntityToOSHData(byte[] gridEntityKey, byte[] data) {
+ if (data == null) {
+ return null;
+ }
+ var bb = ByteBuffer.wrap(gridEntityKey);
+ var gridId = bb.getLong();
+ var entityId = bb.getLong();
+ return new OSHData(type, entityId, gridId, data);
+ }
+
+ @Override
+ public void close() {
+ cfHandles.forEach(ColumnFamilyHandle::close);
+ ofNullable(db).ifPresent(RocksDB::close);
+ dbOptions.close();
+ cfOptions.values().forEach(ColumnFamilyOptions::close);
+ }
+
+ @Override
+ public String toString() {
+ return "EntityStore " + type;
+ }
+}
diff --git a/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBStore.java b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBStore.java
new file mode 100644
index 000000000..65bb4327a
--- /dev/null
+++ b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBStore.java
@@ -0,0 +1,193 @@
+package org.heigit.ohsome.oshdb.rocksdb;
+
+import static java.util.Collections.emptyMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.groupingBy;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.ZonedDateTime;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.source.ReplicationInfo;
+import org.heigit.ohsome.oshdb.store.BackRef;
+import org.heigit.ohsome.oshdb.store.BackRefType;
+import org.heigit.ohsome.oshdb.store.OSHDBStore;
+import org.heigit.ohsome.oshdb.store.OSHData;
+import org.heigit.ohsome.oshdb.util.exceptions.OSHDBException;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+import org.rocksdb.Cache;
+import org.rocksdb.LRUCache;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+public class RocksDBStore implements OSHDBStore {
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ private final TagTranslator tagTranslator;
+ private final Path path;
+ private final Cache cache;
+ private final Map entityStore = new EnumMap<>(OSMType.class);
+ private final Map backRefStore = new EnumMap<>(BackRefType.class);
+
+ public RocksDBStore(TagTranslator tagTranslator, Path path, long cacheSize) throws IOException, RocksDBException {
+ this.tagTranslator = tagTranslator;
+ this.path = path;
+ Files.createDirectories(path);
+ cache = new LRUCache(cacheSize);
+ try {
+ for (var type: OSMType.values()) {
+ entityStore.put(type, new EntityStore(type, path.resolve("entities/" + type), cache));
+ }
+ for (var type: BackRefType.values()) {
+ backRefStore.put(type, new BackRefStore(type, path.resolve("backrefs/" + type), cache));
+ }
+ } catch(RocksDBException e) {
+ close();
+ throw e;
+ }
+ }
+
+ @Override
+ public TagTranslator getTagTranslator() {
+ return tagTranslator;
+ }
+
+ @Override
+ public void state(ReplicationInfo state) {
+ var props = new Properties();
+ props.put("baseUrl", state.getBaseUrl());
+ props.put("sequenceNumber", state.getSequenceNumber());
+ props.put("timestamp", state.getTimestamp());
+ try (var out = Files.newOutputStream(path.resolve("state.txt"))) {
+ props.store(out, "rocksdb store state");
+ } catch (IOException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ @Override
+ public ReplicationInfo state() {
+ var props = new Properties();
+ try (var in = Files.newInputStream(path.resolve("state.txt"))) {
+ props.load(in);
+ return new ReplicationInfo() {
+ @Override
+ public String getBaseUrl() {
+ return props.getProperty("baseUrl", "");
+ }
+
+ @Override
+ public ZonedDateTime getTimestamp() {
+ return ZonedDateTime.parse(props.getProperty("timestamp"));
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return Integer.parseInt(props.getProperty("sequenceNumber"));
+ }
+ };
+ } catch (IOException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ @Override
+ public Map entities(OSMType type, Set ids) {
+ try {
+ return entityStore.get(type).entities(ids);
+ } catch (RocksDBException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ @Override
+ public void entities(Set entities) {
+ for (var entry : entities.stream().collect(groupingBy(OSHData::getType)).entrySet()){
+ try {
+ entityStore.get(entry.getKey()).update(entry.getValue());
+ } catch (RocksDBException e) {
+ throw new OSHDBException(e);
+ }
+ }
+ }
+
+ @Override
+ public List grid(OSMType type, Long gridId) {
+ try {
+ return entityStore.get(type).grid(gridId);
+ } catch (RocksDBException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ @Override
+ public Collection dirtyGrids(OSMType type) {
+ try {
+ return entityStore.get(type).dirtyGrids();
+ } catch (RocksDBException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ @Override
+ public void resetDirtyGrids(OSMType type) {
+ try {
+ entityStore.get(type).resetDirtyGrids();
+ } catch (RocksDBException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ @Override
+ public Map backRefs(OSMType type, Set ids) {
+ Map> ways;
+ Map> relations;
+ try {
+ if (type == OSMType.NODE) {
+ ways = backRefStore.get(BackRefType.NODE_WAY).backRefs(ids);
+ relations = backRefStore.get(BackRefType.NODE_RELATION).backRefs(ids);
+ } else if (type == OSMType.WAY) {
+ ways = emptyMap();
+ relations = backRefStore.get(BackRefType.WAY_RELATION).backRefs(ids);
+ } else if (type == OSMType.RELATION) {
+ ways = emptyMap();
+ relations = backRefStore.get(BackRefType.RELATION_RELATION).backRefs(ids);
+ } else {
+ throw new IllegalStateException();
+ }
+
+ return ids.stream()
+ .map(id -> new BackRef(type, id, ways.get(id), relations.get(id)))
+ .collect(Collectors.toMap(BackRef::getId, identity()));
+ } catch (RocksDBException e) {
+ throw new OSHDBException();
+ }
+ }
+
+ @Override
+ public void backRefsMerge(BackRefType type, long backRef, Set ids) {
+ try {
+ backRefStore.get(type).merge(backRef, ids);
+ } catch (RocksDBException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ backRefStore.values().forEach(BackRefStore::close);
+ entityStore.values().forEach(EntityStore::close);
+ cache.close();
+ }
+}
diff --git a/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBUtil.java b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBUtil.java
new file mode 100644
index 000000000..8b1ab6bd8
--- /dev/null
+++ b/oshdb-rocksdb/src/main/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBUtil.java
@@ -0,0 +1,130 @@
+package org.heigit.ohsome.oshdb.rocksdb;
+
+import static org.rocksdb.CompactionPriority.MinOverlappingRatio;
+import static org.rocksdb.CompressionType.LZ4_COMPRESSION;
+import static org.rocksdb.CompressionType.ZSTD_COMPRESSION;
+import static org.rocksdb.util.SizeUnit.KB;
+import static org.rocksdb.util.SizeUnit.MB;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ColumnFamilyOptionsInterface;
+import org.rocksdb.DBOptions;
+import org.rocksdb.DBOptionsInterface;
+import org.rocksdb.MutableColumnFamilyOptionsInterface;
+import org.rocksdb.MutableDBOptionsInterface;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+import org.rocksdb.util.SizeUnit;
+
+public class RocksDBUtil {
+
+ private RocksDBUtil() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static Options defaultOptions() {
+ var options = new Options();
+ defaultOptions(options);
+ return options;
+ }
+
+ public static BlockBasedTableConfig defaultOptions(Options options) {
+ defaultDBOptions(options);
+ defaultMDBOptions(options);
+
+ defaultCFOptions(options);
+ defaultMCFOptions(options);
+
+ final var tableOptions = new BlockBasedTableConfig();
+ options.setTableFormatConfig(tableOptions);
+ defaultTableConfig(tableOptions);
+ return tableOptions;
+ }
+
+ public static void defaultDBOptions(DBOptionsInterface> options) {
+ options.setCreateIfMissing(true);
+ options.setCreateMissingColumnFamilies(true);
+ }
+
+ public static void defaultMDBOptions(MutableDBOptionsInterface> options) {
+ options.setMaxBackgroundJobs(6);
+ options.setBytesPerSync(1L * MB);
+ }
+
+ public static void defaultCFOptions(ColumnFamilyOptionsInterface> options) {
+ options.setBottommostCompressionType(ZSTD_COMPRESSION);
+ // general options
+ options.setLevelCompactionDynamicLevelBytes(true);
+ options.setCompactionPriority(MinOverlappingRatio);
+ }
+
+ public static void defaultMCFOptions(MutableColumnFamilyOptionsInterface> options) {
+ options.setCompressionType(LZ4_COMPRESSION);
+ }
+
+ public static void defaultTableConfig(BlockBasedTableConfig tableOptions) {
+ tableOptions.setBlockSize(16 * KB);
+ tableOptions.setCacheIndexAndFilterBlocks(true);
+ tableOptions.setPinL0FilterAndIndexBlocksInCache(true);
+ tableOptions.setFormatVersion(5);
+ tableOptions.setIndexBlockRestartInterval(16);
+ tableOptions.setOptimizeFiltersForMemory(true);
+ }
+
+ public static void setCommonDBOption(DBOptions dbOptions) {
+ dbOptions.setCreateIfMissing(true);
+ dbOptions.setCreateMissingColumnFamilies(true);
+ dbOptions.setMaxBackgroundJobs(6);
+ dbOptions.setBytesPerSync(SizeUnit.MB);
+ }
+
+ public static ColumnFamilyOptions cfOptions(Cache cache) {
+ return cfOptions(cache, x -> {
+ });
+ }
+
+ public static ColumnFamilyOptions cfOptions(
+ Cache cache, Consumer blockTableConfig) {
+ var tableConfig = new BlockBasedTableConfig()
+ .setBlockCache(cache)
+ .setBlockSize(16 * SizeUnit.KB)
+ .setCacheIndexAndFilterBlocks(true)
+ .setPinL0FilterAndIndexBlocksInCache(true)
+ .setFormatVersion(5)
+ .setOptimizeFiltersForMemory(true);
+ blockTableConfig.accept(tableConfig);
+
+ var cfOptions = new ColumnFamilyOptions();
+ cfOptions.setCompressionType(LZ4_COMPRESSION);
+ cfOptions.setBottommostCompressionType(ZSTD_COMPRESSION);
+ cfOptions.setCompactionPriority(MinOverlappingRatio);
+ cfOptions.setLevelCompactionDynamicLevelBytes(true);
+ cfOptions.setTableFormatConfig(tableConfig);
+ return cfOptions;
+ }
+
+ public static List idsToKeys(Iterable ids, int size) {
+ var keys = new ArrayList(size);
+ for (var id : ids) {
+ keys.add(idToKey(id));
+ }
+ return keys;
+ }
+
+ public static byte[] idToKey(long id) {
+ return ByteBuffer.allocate(Long.BYTES).putLong(id).array();
+ }
+
+ public static WriteOptions disableWAL() {
+ WriteOptions writeOptions = new WriteOptions();
+ writeOptions.setDisableWAL(true);
+ return writeOptions;
+ }
+}
diff --git a/oshdb-rocksdb/src/test/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBStoreTest.java b/oshdb-rocksdb/src/test/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBStoreTest.java
new file mode 100644
index 000000000..b082cf0f6
--- /dev/null
+++ b/oshdb-rocksdb/src/test/java/org/heigit/ohsome/oshdb/rocksdb/RocksDBStoreTest.java
@@ -0,0 +1,109 @@
+package org.heigit.ohsome.oshdb.rocksdb;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIterable;
+import static org.heigit.ohsome.oshdb.osm.OSMType.NODE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Set;
+import org.h2.jdbcx.JdbcConnectionPool;
+import org.heigit.ohsome.oshdb.store.BackRefType;
+import org.heigit.ohsome.oshdb.store.OSHDBStore;
+import org.heigit.ohsome.oshdb.store.OSHData;
+import org.heigit.ohsome.oshdb.util.tagtranslator.JdbcTagTranslator;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.util.SizeUnit;
+
+
+class RocksDBStoreTest {
+
+ private static final Path STORE_TEST_PATH = Path.of("tests/store/rocksdb");
+
+ OSHDBStore openStore() throws RocksDBException, IOException {
+ Files.createDirectories(STORE_TEST_PATH);
+ var dataSource = JdbcConnectionPool.create("jdbc:h2:" + STORE_TEST_PATH.resolve("keytables"),"sa","");
+ var tagTranslator = new JdbcTagTranslator(dataSource);
+ return new RocksDBStore(tagTranslator, STORE_TEST_PATH, 10 * SizeUnit.MB);
+ }
+
+ @AfterAll
+ static void cleanUp() throws Exception {
+ //noinspection UnstableApiUsage
+ MoreFiles.deleteRecursively(STORE_TEST_PATH, RecursiveDeleteOption.ALLOW_INSECURE);
+ }
+
+ @Test
+ void entities() throws Exception {
+ try (var store = openStore()) {
+ var entities = Set.of(
+ new OSHData(NODE, 10, 1, "Test Node 10".getBytes())
+ , new OSHData(NODE, 20, 2, "Test Node 20".getBytes())
+ , new OSHData(NODE, 22, 2, "Test Node 22".getBytes())
+ , new OSHData(NODE, 30, 3, "Test Node 30".getBytes())
+ );
+ store.entities(entities);
+
+ var dirtyGrids = store.dirtyGrids(NODE);
+ assertEquals(3, dirtyGrids.size());
+ assertThatIterable(dirtyGrids).contains(1L, 2L, 3L);
+
+ var actuals = store.entities(NODE, Set.of(20L));
+ var actual = actuals.get(20L);
+ assertNotNull(actual);
+ assertEquals(20L, actual.getId());
+ assertArrayEquals("Test Node 20".getBytes(), actual.getData());
+ assertEquals(2L, actual.getGridId());
+
+ var grid = store.grid(NODE, 2L);
+ assertEquals(2, grid.size());
+
+ store.entities(Set.of(new OSHData(NODE, 22, 22, "Test Node 22 updated".getBytes())));
+ actual = store.entity(NODE, 22);
+ assertArrayEquals("Test Node 22 updated".getBytes(), actual.getData());
+ assertEquals(22L, actual.getGridId());
+ grid = store.grid(NODE, 2L);
+ assertEquals(1, grid.size());
+ grid = store.grid(NODE, 22L);
+ assertEquals(1, grid.size());
+ }
+
+ try (var store = openStore()) {
+ var actual = store.entity(NODE, 30);
+ assertNotNull(actual);
+ assertEquals(30L, actual.getId());
+ assertArrayEquals("Test Node 30".getBytes(), actual.getData());
+ assertEquals(3L, actual.getGridId());
+ }
+ }
+
+ @Test
+ void backRefs() throws Exception {
+ try (var store = openStore()) {
+ store.backRefsMerge(BackRefType.NODE_WAY, 1234L, Set.of(1L, 2L, 3L, 4L));
+ var backRefs = store.backRefs(NODE, Set.of(1L, 2L, 3L, 4L));
+ assertEquals(4, backRefs.size());
+ assertThat(backRefs.get(1L).ways())
+ .hasSameElementsAs(Set.of(1234L));
+ store.backRefsMerge(BackRefType.NODE_WAY, 2222L, Set.of(1L, 4L));
+ backRefs = store.backRefs(NODE, Set.of(1L));
+ assertEquals(1, backRefs.size());
+ assertThat(backRefs.get(1L).ways())
+ .hasSameElementsAs(Set.of(1234L, 2222L));
+ }
+ try (var store = openStore()) {
+ var backRefs = store.backRefs(NODE, Set.of(4L));
+ assertEquals(1, backRefs.size());
+ assertThat(backRefs.get(4L).ways())
+ .hasSameElementsAs(Set.of(1234L, 2222L));
+ }
+ }
+}
\ No newline at end of file
diff --git a/oshdb-source/pom.xml b/oshdb-source/pom.xml
new file mode 100644
index 000000000..f0c5a545c
--- /dev/null
+++ b/oshdb-source/pom.xml
@@ -0,0 +1,79 @@
+
+
+ 4.0.0
+
+ org.heigit.ohsome
+ oshdb-parent
+ 1.2.0-SNAPSHOT
+
+
+ oshdb-source
+
+
+
+
+
+
+
+ io.projectreactor
+ reactor-bom
+ 2022.0.3
+ pom
+ import
+
+
+
+
+
+
+
+ ${project.groupId}
+ oshdb-util
+ ${project.version}
+
+
+
+ io.projectreactor
+ reactor-core
+
+
+ io.projectreactor.addons
+ reactor-extra
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.21.9
+
+
+
+ org.openstreetmap.pbf
+ osmpbf
+ 1.5.0
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+
+ com.h2database
+ h2
+ ${h2.version}
+ test
+
+
+
+
+
\ No newline at end of file
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/OSMSource.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/OSMSource.java
new file mode 100644
index 000000000..49c7aedd3
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/OSMSource.java
@@ -0,0 +1,12 @@
+package org.heigit.ohsome.oshdb.source;
+
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+import reactor.core.publisher.Flux;
+import reactor.util.function.Tuple2;
+
+public interface OSMSource {
+
+ Flux>> entities(TagTranslator tagTranslator);
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/ReplicationInfo.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/ReplicationInfo.java
new file mode 100644
index 000000000..7a3387f5d
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/ReplicationInfo.java
@@ -0,0 +1,34 @@
+package org.heigit.ohsome.oshdb.source;
+
+
+import java.time.ZonedDateTime;
+
+public interface ReplicationInfo {
+
+ static ReplicationInfo of(String url, String timestamp, int sequenceNumber) {
+ return new ReplicationInfo() {
+
+ @Override
+ public ZonedDateTime getTimestamp() {
+ return ZonedDateTime.parse(timestamp);
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public String getBaseUrl() {
+ return url;
+ }
+ };
+ }
+
+ String getBaseUrl();
+
+ ZonedDateTime getTimestamp();
+
+ int getSequenceNumber();
+
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/SourceUtil.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/SourceUtil.java
new file mode 100644
index 000000000..b3524305b
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/SourceUtil.java
@@ -0,0 +1,84 @@
+package org.heigit.ohsome.oshdb.source;
+
+import static java.util.Arrays.stream;
+import static org.heigit.ohsome.oshdb.osm.OSM.node;
+import static org.heigit.ohsome.oshdb.osm.OSM.relation;
+import static org.heigit.ohsome.oshdb.osm.OSM.way;
+import static org.heigit.ohsome.oshdb.util.flux.FluxUtil.entryToTuple;
+import static org.heigit.ohsome.oshdb.util.flux.FluxUtil.mapT2;
+import static org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator.TranslationOption.ADD_MISSING;
+import static reactor.core.publisher.Flux.fromIterable;
+
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.osm.OSMNode;
+import org.heigit.ohsome.oshdb.osm.OSMRelation;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.osm.OSMWay;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMRole;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+import reactor.core.publisher.Flux;
+import reactor.util.function.Tuple2;
+
+public class SourceUtil {
+
+
+ private SourceUtil() {
+ // utility class
+ }
+
+ public static Flux>> entities(Map> entities,
+ Map tags, Map roles, TagTranslator tagTranslator) {
+ var tagsMapping = tagsMapping(tagTranslator, tags);
+ var rolesMapping = rolesMapping(tagTranslator, roles);
+ return fromIterable(entities.entrySet())
+ .map(entryToTuple())
+ .map(mapT2(f -> fromIterable(f).map(osm -> map(osm, tagsMapping, rolesMapping))));
+ }
+
+ private static synchronized Map tagsMapping(TagTranslator tagTranslator,
+ Map tags) {
+ var tagsTranslated = tagTranslator.getOSHDBTagOf(tags.values(), ADD_MISSING);
+ var tagsMapping = Maps.newHashMapWithExpectedSize(tags.size());
+ tags.forEach((oshdb, osm) -> tagsMapping.put(oshdb, tagsTranslated.get(osm)));
+ return tagsMapping;
+ }
+
+ private static synchronized Map rolesMapping(TagTranslator tagTranslator,
+ Map roles) {
+ var rolesTranslated = tagTranslator.getOSHDBRoleOf(roles.values(), ADD_MISSING);
+ var rolesMapping = Maps.newHashMapWithExpectedSize(roles.size());
+ roles.forEach((oshdb, osm) -> rolesMapping.put(oshdb, rolesTranslated.get(osm).getId()));
+ return rolesMapping;
+ }
+
+ public static int version(int version, boolean visible) {
+ return visible ? version : -version;
+ }
+
+ private static OSMEntity map(OSMEntity osm, Map tagsMapping,
+ Map rolesMapping) {
+ var tags = osm.getTags().stream().map(tagsMapping::get).sorted().toList();
+ if (osm instanceof OSMNode node) {
+ return node(osm.getId(), version(osm.getVersion(), osm.isVisible()), osm.getEpochSecond(),
+ osm.getChangesetId(), osm.getUserId(), tags, node.getLon(), node.getLat());
+ } else if (osm instanceof OSMWay way) {
+ return way(osm.getId(), version(osm.getVersion(), osm.isVisible()), osm.getEpochSecond(),
+ osm.getChangesetId(), osm.getUserId(), tags, way.getMembers());
+ } else if (osm instanceof OSMRelation relation) {
+ var members = stream(relation.getMembers()).map(
+ mem -> new OSMMember(mem.getId(), mem.getType(), rolesMapping.get(mem.getRole().getId())))
+ .toArray(OSMMember[]::new);
+ return relation(osm.getId(), version(osm.getVersion(), osm.isVisible()),
+ osm.getEpochSecond(), osm.getChangesetId(), osm.getUserId(), tags, members);
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/OscParser.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/OscParser.java
new file mode 100644
index 000000000..4f92f0eb1
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/OscParser.java
@@ -0,0 +1,437 @@
+package org.heigit.ohsome.oshdb.source.osc;
+
+import static com.google.common.collect.Streams.stream;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.joining;
+import static javax.xml.stream.XMLStreamConstants.CDATA;
+import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
+import static javax.xml.stream.XMLStreamConstants.COMMENT;
+import static javax.xml.stream.XMLStreamConstants.END_DOCUMENT;
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.PROCESSING_INSTRUCTION;
+import static javax.xml.stream.XMLStreamConstants.SPACE;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+import static org.heigit.ohsome.oshdb.source.SourceUtil.version;
+
+import java.io.InputStream;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import javax.management.modelmbean.XMLParseException;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.osm.OSM;
+import org.heigit.ohsome.oshdb.osm.OSMCoordinates;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.osm.OSMNode;
+import org.heigit.ohsome.oshdb.osm.OSMRelation;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.osm.OSMWay;
+import org.heigit.ohsome.oshdb.source.OSMSource;
+import org.heigit.ohsome.oshdb.source.SourceUtil;
+import org.heigit.ohsome.oshdb.util.exceptions.OSHDBException;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMRole;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.util.function.Tuple2;
+
+public class OscParser implements OSMSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OscParser.class);
+
+ private final InputStream inputStream;
+
+ public static Flux>> entities(InputStream inputStream,
+ TagTranslator tagTranslator) {
+ return new OscParser(inputStream).entities(tagTranslator);
+ }
+
+ public OscParser(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public Flux>> entities(TagTranslator tagTranslator) {
+ try (var parser = new Parser(inputStream)) {
+ @SuppressWarnings("UnstableApiUsage")
+ var entities = stream(parser).collect(Collectors.groupingBy(OSMEntity::getType));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("osc entities: {}, strings: {}, tags: {}, roles: {}",
+ entities.entrySet().stream()
+ .map(entry -> format("%s=%d", entry.getKey(), entry.getValue().size()))
+ .collect(joining("; ")),
+ parser.cacheString.size(),
+ parser.cacheTags.size(),
+ parser.cacheRoles.size());
+ }
+ return SourceUtil.entities(entities, parser.cacheTags, parser.cacheRoles, tagTranslator);
+ } catch (Exception e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+
+ private static class Parser implements Iterator, AutoCloseable {
+
+ private static final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
+
+ private final XMLStreamReader reader;
+
+ private long id = -1;
+ private int version = -1;
+ private long timestamp = -1;
+ private long changeset = -1;
+ private int uid = -1;
+ private String user = "";
+ private boolean visible = false;
+ private final List tags = new ArrayList<>();
+ private final List members = new ArrayList<>();
+
+ private final Map cacheString = new HashMap<>();
+ private final Map cacheTags = new HashMap<>();
+ private final Map cacheRoles = new HashMap<>();
+
+ private Exception exception = null;
+ private OSMEntity next = null;
+ private double lon;
+ private double lat;
+
+ Parser(InputStream input) throws XMLStreamException, XMLParseException {
+ this.reader = xmlInputFactory.createXMLStreamReader(input, "UTF8");
+
+ var eventType = reader.nextTag();
+ if (eventType != START_ELEMENT) {
+ throw new XMLParseException("start of element");
+ }
+ var localName = reader.getLocalName();
+ if (!"osmChange".equals(localName)) {
+ throw new XMLParseException(format("expecting tag(osmChange) but got %s", localName));
+ }
+ openChangeContainer();
+ }
+
+ private int[] tags(List osmTags) {
+ var kvs = new int[osmTags.size() * 2];
+ var i = 0;
+ for (var tag : osmTags) {
+ var keyId = cacheString.computeIfAbsent(tag.getKey(), x -> cacheString.size());
+ var valId = cacheString.computeIfAbsent(tag.getValue(), x -> cacheString.size());
+ cacheTags.put(new OSHDBTag(keyId, valId), tag);
+ kvs[i++] = keyId;
+ kvs[i++] = valId;
+ }
+ return kvs;
+ }
+
+ private int lonLatConversion(double d) {
+ return (int) (d * OSMCoordinates.GEOM_PRECISION_TO_LONG);
+ }
+
+ protected OSMMember[] members(List mems) {
+ var osmMembers = new OSMMember[mems.size()];
+ var i = 0;
+ for (var mem : mems) {
+ var roleId = cacheString.computeIfAbsent(mem.getRole().toString(), x -> cacheString.size());
+ cacheRoles.put(roleId, mem.getRole());
+ osmMembers[i++] = new OSMMember(mem.getId(), mem.getType(), roleId);
+ }
+ return osmMembers;
+ }
+
+ private boolean openChangeContainer() throws XMLParseException, XMLStreamException {
+ int eventType = nextEvent(reader);
+ if (eventType == END_ELEMENT || eventType == END_DOCUMENT) {
+ return false;
+ }
+ if (eventType != START_ELEMENT) {
+ throw new XMLParseException("start of element");
+ }
+
+ var localName = reader.getLocalName();
+ if ("create".equals(localName) || "modify".equals(localName)) {
+ visible = true;
+ } else if ("delete".equals(localName)) {
+ visible = false;
+ } else {
+ throw new XMLParseException("expecting tag (create/modify/delete) but got " + localName);
+ }
+ return true;
+ }
+
+ private void parseAttributes() throws XMLParseException {
+ var attributeCount = reader.getAttributeCount();
+ for (int i = 0; i < attributeCount; i++) {
+ var attrName = reader.getAttributeLocalName(i);
+ var attrValue = reader.getAttributeValue(i);
+ if ("id".equals(attrName)) {
+ id = Long.parseLong(attrValue);
+ } else if ("version".equals(attrName)) {
+ version = Integer.parseInt(attrValue);
+ } else if ("timestamp".equals(attrName)) {
+ timestamp = Instant.parse(attrValue).getEpochSecond();
+ } else if ("uid".equals(attrName)) {
+ uid = Integer.parseInt(attrValue);
+ } else if ("user".equals(attrName)) {
+ user = attrValue;
+ } else if ("changeset".equals(attrName)) {
+ changeset = Long.parseLong(attrValue);
+ } else if ("lon".equals(attrName)) {
+ lon = Double.parseDouble(attrValue);
+ } else if ("lat".equals(attrName)) {
+ lat = Double.parseDouble(attrValue);
+ } else {
+ throw new XMLParseException("unknown attribute: " + attrName);
+ }
+ }
+ }
+
+ private void parseTag() throws XMLParseException {
+ String key = null;
+ String value = null;
+ int attributeCount = reader.getAttributeCount();
+ for (int i = 0; i < attributeCount; i++) {
+ var attrName = reader.getAttributeLocalName(i);
+ var attrValue = reader.getAttributeValue(i);
+ if ("k".equals(attrName)) {
+ key = attrValue;
+ } else if ("v".equals(attrName)) {
+ value = attrValue;
+ } else {
+ unknownAttribute(attrName);
+ }
+ }
+
+ if (key == null || value == null) {
+ throw new XMLParseException(format("missing key(%s) or value(%s)", key, value));
+ }
+ tags.add(new OSMTag(key, value));
+ }
+
+ private static void unknownAttribute(String attrName) throws XMLParseException {
+ throw new XMLParseException(format("unknown attribute: %s", attrName));
+ }
+
+ private void parseWayMember() throws XMLParseException {
+ var memberId = -1L;
+ var attributeCount = reader.getAttributeCount();
+ for (int i = 0; i < attributeCount; i++) {
+ var attrName = reader.getAttributeLocalName(i);
+ var attrValue = reader.getAttributeValue(i);
+ if ("ref".equals(attrName)) {
+ memberId = Long.parseLong(attrValue);
+ } else {
+ unknownAttribute(attrName);
+ }
+ }
+ if (memberId < 0) {
+ throw new XMLParseException("missing member id!");
+ }
+ members.add(new Mem(memberId));
+ }
+
+ private void parseMember() throws XMLParseException {
+ String type = null;
+ long ref = -1;
+ String role = null;
+ var attributeCount = reader.getAttributeCount();
+ for (int i = 0; i < attributeCount; i++) {
+ var attrName = reader.getAttributeLocalName(i);
+ var attrValue = reader.getAttributeValue(i);
+ if ("type".equals(attrName)) {
+ type = attrValue;
+ } else if ("ref".equals(attrName)) {
+ ref = Long.parseLong(attrValue);
+ } else if ("role".equals(attrName)) {
+ role = attrValue;
+ } else {
+ unknownAttribute(attrName);
+ }
+ }
+ if (type == null || ref < 0 || role == null) {
+ throw new XMLParseException(format("missing member attribute (%s,%d,%s)", type, ref, role));
+ }
+ members.add(new Mem(OSMType.valueOf(type.toUpperCase()), ref, role));
+ }
+
+ private void parseEntity() throws XMLStreamException, XMLParseException {
+ id = timestamp = changeset = uid = version = -1;
+ user = "";
+ lon = lat = -999.9;
+ tags.clear();
+ members.clear();
+
+ parseAttributes();
+ int eventType;
+ while ((eventType = reader.nextTag()) == START_ELEMENT) {
+ String localName = reader.getLocalName();
+ if ("tag".equals(localName)) {
+ parseTag();
+ } else if ("nd".equals(localName)) {
+ parseWayMember();
+ } else if ("member".equals(localName)) {
+ parseMember();
+ } else {
+ throw new XMLParseException("unexpected tag, expect tag/nd/member but got " + localName);
+ }
+ eventType = reader.nextTag();
+ if (eventType != END_ELEMENT) {
+ throw new XMLParseException("unclosed " + localName);
+ }
+ }
+ if (eventType != END_ELEMENT) {
+ throw new XMLParseException(format("expect tag end but got %s", eventType));
+ }
+ }
+
+ private OSMNode nextNode() throws XMLParseException, XMLStreamException {
+ parseEntity();
+ if (visible && !validCoordinate(lon, lat)) {
+ throw new XMLParseException(format("invalid coordinates! lon:%f lat:%f", lon, lat));
+ }
+
+ LOG.debug("node/{} {} {} {} {} {} {} {} {} {}", id, version, visible, timestamp, changeset,
+ user, uid, tags, lon, lat);
+ return OSM.node(id, version(version, visible), timestamp, changeset, uid, tags(tags),
+ lonLatConversion(lon), lonLatConversion(lat));
+ }
+
+ private boolean validCoordinate(double lon, double lat) {
+ return Math.abs(lon) <= 180.0 && Math.abs(lat) <= 90.0;
+ }
+
+ private OSMWay nextWay() throws XMLStreamException, XMLParseException {
+ parseEntity();
+ LOG.debug("way/{} {} {} {} {} {} {} {} {}", id, version, visible, timestamp, changeset, user,
+ uid, tags, members.size());
+ return OSM.way(id, version(version, visible), timestamp, changeset, uid, tags(tags),
+ members(members));
+ }
+
+ private OSMRelation nextRelation() throws XMLStreamException, XMLParseException {
+ parseEntity();
+ LOG.debug("relation/{} {} {} {} {} {} {} {} mems:{}", id, version, visible, timestamp,
+ changeset, user, uid, tags, members.size());
+ return OSM.relation(id, version(version, visible), timestamp, changeset, uid, tags(tags),
+ members(members));
+ }
+
+ private OSMEntity computeNext() {
+ try {
+ var eventType = nextEvent(reader);
+ if (eventType == END_DOCUMENT) {
+ return null;
+ }
+
+ if (eventType == END_ELEMENT) {
+ if (!openChangeContainer()) {
+ return null;
+ }
+ eventType = reader.nextTag();
+ }
+ if (eventType != START_ELEMENT) {
+ throw new XMLParseException("expecting start of (node/way/relation)");
+ }
+ String localName = reader.getLocalName();
+ if ("node".equals(localName)) {
+ return nextNode();
+ } else if ("way".equals(localName)) {
+ return nextWay();
+ } else if ("relation".equals(localName)) {
+ return nextRelation();
+ }
+ throw new XMLParseException(format("expecting (node/way/relation) but got %s", localName));
+ } catch (Exception e) {
+ this.exception = e;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (next != null) || (next = computeNext()) != null;
+ }
+
+ @Override
+ public OSMEntity next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException((exception == null ? null : exception.toString()));
+ }
+ var r = next;
+ next = null;
+ return r;
+ }
+
+ private int nextEvent(XMLStreamReader reader) throws XMLStreamException {
+ while (true) {
+ var event = readNextEvent(reader);
+ if (!event.skip) {
+ return event.event;
+ }
+ }
+ }
+
+ private Event readNextEvent(XMLStreamReader reader) throws XMLStreamException {
+ var event = reader.next();
+ return switch (event) {
+ case SPACE, COMMENT, PROCESSING_INSTRUCTION, CDATA, CHARACTERS -> new Event(event, true);
+ case START_ELEMENT, END_ELEMENT, END_DOCUMENT -> new Event(event, false);
+ default ->
+ throw new XMLStreamException(format(
+ "Received event %d, instead of START_ELEMENT or END_ELEMENT or END_DOCUMENT.",
+ event));
+ };
+ }
+
+ private record Event(int event, boolean skip) {}
+
+ @Override
+ public void close() throws Exception {
+ reader.close();
+ }
+ }
+
+ private static class Mem {
+
+ private final OSMType type;
+ private final long id;
+ private final OSMRole role;
+
+ public Mem(OSMType type, long id, String role) {
+ this.type = type;
+ this.id = id;
+ this.role = new OSMRole(role);
+ }
+
+ public Mem(long id) {
+ this(OSMType.NODE, id, "");
+ }
+
+ public OSMType getType() {
+ return type;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public OSMRole getRole() {
+ return role;
+ }
+
+ @Override
+ public String toString() {
+ return format("%s/%s[%s]", type, id, role);
+ }
+ }
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/ReplicationEndpoint.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/ReplicationEndpoint.java
new file mode 100644
index 000000000..14b0199b8
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/ReplicationEndpoint.java
@@ -0,0 +1,79 @@
+package org.heigit.ohsome.oshdb.source.osc;
+
+import static java.lang.String.format;
+import static java.time.Duration.ZERO;
+import static java.time.Duration.ofDays;
+import static java.time.Duration.ofHours;
+import static java.time.Duration.ofMillis;
+import static java.time.Duration.ofMinutes;
+import static java.util.Optional.ofNullable;
+import static org.heigit.ohsome.oshdb.source.osc.ReplicationState.getServerState;
+import static org.heigit.ohsome.oshdb.source.osc.ReplicationState.getState;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.ZonedDateTime;
+import java.util.Map;
+import org.heigit.ohsome.oshdb.source.ReplicationInfo;
+
+public class ReplicationEndpoint {
+
+ private static final String OSM_REPLICATION_MINUTE = "https://planet.osm.org/replication/minute/";
+ private static final String OSM_REPLICATION_HOUR = "https://planet.osm.org/replication/hour/";
+ private static final String OSM_REPLICATION_DAY = "https://planet.osm.org/replication/day/";
+
+ public static final ReplicationEndpoint OSM_ORG_MINUTELY;
+ public static final ReplicationEndpoint OSM_ORG_HOURLY;
+ public static final ReplicationEndpoint OSM_ORG_DAILY;
+
+ private static final Map OSM_ORG_REPLICATION_ENDPOINTS;
+
+ static {
+ OSM_ORG_MINUTELY = new ReplicationEndpoint(OSM_REPLICATION_MINUTE, ofMinutes(1), ZERO);
+ OSM_ORG_HOURLY = new ReplicationEndpoint(OSM_REPLICATION_HOUR, ofHours(1), ofMillis(2));
+ OSM_ORG_DAILY = new ReplicationEndpoint(OSM_REPLICATION_DAY, ofDays(1), ofMinutes(20));
+ OSM_ORG_REPLICATION_ENDPOINTS = Map.of(
+ OSM_ORG_MINUTELY.url(), OSM_ORG_MINUTELY,
+ OSM_ORG_HOURLY.url(), OSM_ORG_HOURLY,
+ OSM_ORG_DAILY.url(), OSM_ORG_DAILY);
+ }
+
+ private final String url;
+ private final Duration frequency;
+ private final Duration delay;
+
+ public ReplicationEndpoint(String url, Duration frequency, Duration delay) {
+ this.url = url;
+ this.frequency = frequency;
+ this.delay = delay;
+ }
+
+ public ReplicationState serverState() throws IOException {
+ return getServerState(this);
+ }
+
+ public ReplicationState state(int sequenceNumber) throws IOException {
+ return getState(this, sequenceNumber);
+ }
+
+ public String url() {
+ return url;
+ }
+
+ public ZonedDateTime nextTimestamp(ReplicationState state) {
+ return state.getTimestamp().plus(frequency).plus(delay);
+ }
+
+ @Override
+ public String toString() {
+ return format("ReplicationEndpoint [url=%s, frequency=%s, delay=%s]", url, frequency, delay);
+ }
+
+ public static ReplicationState stateFromInfo(ReplicationInfo info) {
+ if (info instanceof ReplicationState state) {
+ return state;
+ }
+ var endpoint = ofNullable(OSM_ORG_REPLICATION_ENDPOINTS.get(info.getBaseUrl())).orElseThrow();
+ return new ReplicationState(endpoint, info.getTimestamp(), info.getSequenceNumber());
+ }
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/ReplicationState.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/ReplicationState.java
new file mode 100644
index 000000000..b2c559d27
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/osc/ReplicationState.java
@@ -0,0 +1,132 @@
+package org.heigit.ohsome.oshdb.source.osc;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static java.net.URI.create;
+import static reactor.core.publisher.Flux.using;
+
+import com.google.common.io.Closeables;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+import java.time.ZonedDateTime;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.zip.GZIPInputStream;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.source.OSMSource;
+import org.heigit.ohsome.oshdb.source.ReplicationInfo;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.util.function.Tuple2;
+
+public class ReplicationState implements ReplicationInfo, OSMSource {
+
+ protected static final Logger Log = LoggerFactory.getLogger(ReplicationState.class);
+ private static final DecimalFormat sequenceFormatter;
+
+ static {
+ var formatSymbols = new DecimalFormatSymbols(Locale.US);
+ formatSymbols.setGroupingSeparator('/');
+ sequenceFormatter = new DecimalFormat("000,000,000", formatSymbols);
+ }
+
+ public static ReplicationState getServerState(ReplicationEndpoint endpoint)
+ throws IOException {
+ return getState(endpoint, "state.txt");
+ }
+
+ public static ReplicationState getState(ReplicationEndpoint endpoint, int sequenceNumber)
+ throws IOException {
+ var statePath = format("%s.state.txt", sequenceFormatter.format(sequenceNumber));
+ return getState(endpoint, statePath);
+ }
+
+ private static ReplicationState getState(ReplicationEndpoint endpoint, String statePath)
+ throws IOException {
+ try (var input = openConnection(create(endpoint.url() + statePath).toURL())) {
+ var props = new Properties();
+ props.load(input);
+ return new ReplicationState(endpoint, props);
+ }
+ }
+
+ private final ReplicationEndpoint endpoint;
+ private final ZonedDateTime timestamp;
+ private final int sequenceNumber;
+
+ private ReplicationState(ReplicationEndpoint endpoint, Properties props) {
+ this(endpoint,
+ ZonedDateTime.parse(props.getProperty("timestamp")),
+ parseInt(props.getProperty("sequenceNumber")));
+ }
+
+ public ReplicationState(ReplicationEndpoint endpoint, ZonedDateTime timestamp,
+ int sequenceNumber) {
+ this.endpoint = endpoint;
+ this.timestamp = timestamp;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public ReplicationEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public ReplicationState serverState() throws IOException {
+ return endpoint.serverState();
+ }
+
+ public ReplicationState state(int sequenceNumber) throws IOException {
+ return endpoint.state(sequenceNumber);
+ }
+
+ public ZonedDateTime nextTimestamp() {
+ return endpoint.nextTimestamp(this);
+ }
+
+ @Override
+ public String getBaseUrl() {
+ return endpoint.url();
+ }
+
+ @Override
+ public ZonedDateTime getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public Flux>> entities(TagTranslator tagTranslator) {
+ Log.debug("read entities from {}", this);
+ //noinspection UnstableApiUsage
+ return using(this::openStream, input -> OscParser.entities(input, tagTranslator),
+ Closeables::closeQuietly);
+ }
+
+ private InputStream openStream() throws IOException {
+ return new GZIPInputStream(openConnection(create(
+ endpoint.url() + format("%s.osc.gz", sequenceFormatter.format(sequenceNumber))).toURL()));
+ }
+
+ private static InputStream openConnection(URL url) throws IOException {
+ var connection = url.openConnection();
+ connection.setReadTimeout(10 * 60 * 1000); // timeout 10 minutes
+ connection.setConnectTimeout(10 * 60 * 1000); // timeout 10 minutes
+ return connection.getInputStream();
+ }
+
+ @Override
+ public String toString() {
+ return format("ReplicationFile [endpoint=%s, timestamp=%s, sequenceNumber=%s]", endpoint,
+ timestamp, sequenceNumber);
+ }
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/Blob.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/Blob.java
new file mode 100644
index 000000000..8c42f73b8
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/Blob.java
@@ -0,0 +1,134 @@
+package org.heigit.ohsome.oshdb.source.pbf;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import crosby.binary.Fileformat;
+import crosby.binary.Osmformat;
+import crosby.binary.file.FileFormatException;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.NoSuchElementException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+import reactor.core.publisher.Mono;
+
+public class Blob {
+
+ private static final int MAX_HEADER_SIZE = 64 * 1024;
+
+ public static Blob read(InputStream input) throws IOException {
+ DataInputStream dataInput = new DataInputStream(input);
+ var headerSize = dataInput.readInt();
+ if (headerSize > MAX_HEADER_SIZE) {
+ throw new FileFormatException(
+ "Unexpectedly long header " + MAX_HEADER_SIZE + " bytes. Possibly corrupt file.");
+ }
+
+ var buf = new byte[headerSize];
+ dataInput.readFully(buf);
+ var header = Fileformat.BlobHeader.parseFrom(buf);
+
+ var offset = position(input);
+
+ var data = new byte[header.getDatasize()];
+ dataInput.readFully(data);
+
+ return new Blob(header.getType(), offset, data);
+ }
+
+ private static long position(InputStream input) throws IOException {
+ if (input instanceof FileInputStream in) {
+ return in.getChannel().position();
+ }
+ return -1;
+ }
+
+ private final String type;
+ private final long offset;
+ private final byte[] data;
+
+ private Blob(String type, long offset, byte[] data) {
+ this.type = type;
+ this.offset = offset;
+ this.data = data;
+ }
+
+ @Override
+ public String toString() {
+ return "Blob{" +
+ "type='" + type + '\'' +
+ ", offset=" + offset +
+ ", data=" + data.length +
+ "bytes";
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public byte[] data() {
+ return data;
+ }
+
+ public boolean isHeader() {
+ return "OSMHeader".equals(type);
+ }
+
+ public Osmformat.HeaderBlock header() throws FileFormatException {
+ if (!isHeader()) {
+ throw new NoSuchElementException();
+ }
+
+ try {
+ return Osmformat.HeaderBlock.parseFrom(decompress());
+ } catch (InvalidProtocolBufferException e) {
+ throw new FileFormatException(e);
+ }
+ }
+
+ public boolean isData() {
+ return "OSMData".equals(type);
+ }
+
+ public Mono block() {
+ if (!isData()) {
+ return Mono.error(new NoSuchElementException());
+ }
+ return Mono.fromCallable(() -> Block.parse(this, decompress()));
+ }
+
+ private byte[] decompress() throws FileFormatException {
+ var blob = parseBlob();
+ if (blob.hasRaw()) {
+ return blob.getRaw().toByteArray();
+ }
+ if (blob.hasZlibData()) {
+ return decompress(blob);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ private static byte[] decompress(Fileformat.Blob blob) throws FileFormatException {
+ var buffer = new byte[blob.getRawSize()];
+ Inflater inflater = new Inflater();
+ try {
+ inflater.setInput(blob.getZlibData().toByteArray());
+ inflater.inflate(buffer);
+ assert (inflater.finished());
+ } catch (DataFormatException e) {
+ throw new FileFormatException(e);
+ } finally {
+ inflater.end();
+ }
+ return buffer;
+ }
+
+ private Fileformat.Blob parseBlob() throws FileFormatException {
+ try {
+ return Fileformat.Blob.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new FileFormatException(e);
+ }
+ }
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/Block.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/Block.java
new file mode 100644
index 000000000..bda1a8312
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/Block.java
@@ -0,0 +1,196 @@
+package org.heigit.ohsome.oshdb.source.pbf;
+
+import static java.lang.Math.toIntExact;
+import static java.util.Optional.ofNullable;
+import static java.util.Spliterator.ORDERED;
+import static java.util.Spliterators.spliterator;
+import static java.util.stream.StreamSupport.stream;
+import static org.heigit.ohsome.oshdb.osm.OSM.node;
+import static org.heigit.ohsome.oshdb.osm.OSM.relation;
+import static org.heigit.ohsome.oshdb.osm.OSM.way;
+import static org.heigit.ohsome.oshdb.osm.OSMType.NODE;
+
+import com.google.common.collect.Streams;
+import com.google.protobuf.InvalidProtocolBufferException;
+import crosby.binary.Osmformat;
+import crosby.binary.Osmformat.PrimitiveBlock;
+import crosby.binary.Osmformat.PrimitiveGroup;
+import crosby.binary.file.FileFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.heigit.ohsome.oshdb.OSHDBRole;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.util.exceptions.OSHDBException;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMRole;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+
+public class Block {
+
+
+ public static Block parse(Blob blob, byte[] data) throws FileFormatException {
+ try {
+ var block = Osmformat.PrimitiveBlock.parseFrom(data);
+
+ var granularity = block.getGranularity();
+ var latOffset = block.getLatOffset();
+ var lonOffset = block.getLonOffset();
+ var dateGranularity = block.getDateGranularity();
+
+ if (granularity != 100) {
+ throw new OSHDBException("expected granularity must be 100! But got " + granularity);
+ }
+ if (dateGranularity != 1000) {
+ throw new OSHDBException("expected date granularity must be 1000! But got " + dateGranularity);
+ }
+ if (lonOffset != 0 || latOffset != 0) {
+ throw new OSHDBException("expected lon/lat offset must be 0! But got " + lonOffset + "/" + latOffset);
+ }
+
+ var stringTable = block.getStringtable();
+ var strings = new String[stringTable.getSCount()];
+ for (int i = 0; i < strings.length; i++) {
+ strings[i] = stringTable.getS(i).toStringUtf8();
+ }
+ return new Block(blob, block, strings);
+ } catch (InvalidProtocolBufferException e) {
+ throw new FileFormatException(e);
+ }
+ }
+
+ private final Blob blob;
+ private final PrimitiveBlock primitiveBlock;
+ private final String[] strings;
+
+ private final Map tags = new HashMap<>();
+ private final Map roles = new HashMap<>();
+
+ private Block(Blob blob, PrimitiveBlock block, String[] strings) {
+ this.blob = blob;
+ this.primitiveBlock = block;
+ this.strings = strings;
+ }
+
+ @Override
+ public String toString() {
+ return "Block{blob=" + blob +'}';
+ }
+
+ public Stream entities() {
+ return primitiveBlock.getPrimitivegroupList().stream()
+ .flatMap(this::groupToEntities);
+ }
+
+ private Stream groupToEntities(Osmformat.PrimitiveGroup group) {
+ return Streams.concat(
+ denseToEntities(group),
+ group.getNodesList().stream().map(this::parse),
+ group.getWaysList().stream().map(this::parse),
+ group.getRelationsList().stream().map(this::parse));
+ }
+
+ private Stream denseToEntities(PrimitiveGroup group) {
+ if (!group.hasDense()) {
+ return Stream.empty();
+ }
+ var dense = group.getDense();
+ var itr = new DenseIterator(this, dense);
+ return stream(spliterator(itr, dense.getIdCount(), ORDERED), false);
+ }
+
+ private OSMEntity parse(Osmformat.Node entity) {
+ var id = entity.getId();
+ var lon = entity.getLon();
+ var lat = entity.getLat();
+
+ return withInfo(entity.getKeysList(), entity.getValsList(), entity.getInfo(),
+ (timestamp, changeset, user, version, tags) ->
+ node(id, version, timestamp, changeset, user, tags, toIntExact(lon), toIntExact(lat)));
+ }
+
+ private OSMEntity parse(Osmformat.Way entity) {
+ var id = entity.getId();
+ var members = new OSMMember[entity.getRefsCount()];
+ var memId = 0L;
+ for (var i = 0; i < members.length; i++) {
+ memId += entity.getRefs(i);
+ members[i] = new OSMMember(memId, NODE, -1);
+ }
+ return withInfo(entity.getKeysList(), entity.getValsList(), entity.getInfo(),
+ (timestamp, changeset, user, version, tags) ->
+ way(id, version, timestamp, changeset, user, tags, members));
+ }
+
+ private final Map memCache = new HashMap<>();
+
+ private OSMEntity parse(Osmformat.Relation entity) {
+ var id = entity.getId();
+ var members = new OSMMember[entity.getMemidsCount()];
+ var memId = 0L;
+ var relationRoles = new HashSet();
+ for (var i = 0; i < members.length; i++) {
+ memId += entity.getMemids(i);
+ var type = entity.getTypes(i);
+ var role = entity.getRolesSid(i);
+ var member = new OSMMember(memId, OSMType.fromInt(type.getNumber()), role);
+ relationRoles.add(member.getRole());
+ members[i] = ofNullable(memCache.putIfAbsent(member, member)).orElse(member);
+ }
+ addToBlockRoles(relationRoles);
+ return withInfo(entity.getKeysList(), entity.getValsList(), entity.getInfo(),
+ (timestamp, changeset, user, version, tags) ->
+ relation(id, version, timestamp, changeset, user, tags, members));
+ }
+
+ private T withInfo(List keys, List values, Osmformat.Info info,
+ EntityInfo metadata) {
+ var timestamp = info.getTimestamp();
+ var changeset = info.getChangeset();
+ var user = info.getUid();
+
+ var visible = info.hasVisible() && !info.getVisible() ? -1 : 1;
+ var version = info.getVersion() * visible;
+
+ var entityTags = new ArrayList(keys.size());
+ for (var i = 0; i < keys.size(); i++) {
+ entityTags.add(new OSHDBTag(keys.get(i), values.get(i)));
+ }
+ addToBlockTags(entityTags);
+ return metadata.apply(timestamp, changeset, user, version, entityTags);
+ }
+
+ private interface EntityInfo {
+ T apply(long timestamp, long changeset, int user, int version, List tags);
+ }
+
+ void addToBlockTags(List tags) {
+ tags.forEach(tag -> this.tags.computeIfAbsent(tag,this::osmTag));
+ }
+
+ void addToBlockRoles(Set roles) {
+ roles.forEach(role -> this.roles.computeIfAbsent(role.getId(), this::osmRole));
+ }
+
+ private OSMTag osmTag(OSHDBTag tag) {
+ return new OSMTag(strings[tag.getKey()], strings[tag.getValue()]);
+ }
+
+ private OSMRole osmRole(int role) {
+ return new OSMRole(strings[role]);
+ }
+
+ public Map tags() {
+ return tags;
+ }
+
+ public Map roles() {
+ return roles;
+ }
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/DenseIterator.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/DenseIterator.java
new file mode 100644
index 000000000..4f2fcb47e
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/DenseIterator.java
@@ -0,0 +1,110 @@
+package org.heigit.ohsome.oshdb.source.pbf;
+
+import static java.lang.Boolean.TRUE;
+import static java.lang.Math.toIntExact;
+import static java.util.Collections.emptyList;
+import static org.heigit.ohsome.oshdb.osm.OSM.node;
+
+import crosby.binary.Osmformat;
+import crosby.binary.Osmformat.DenseNodes;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.IntFunction;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.util.exceptions.OSHDBException;
+
+class DenseIterator implements Iterator {
+
+ private final Block block;
+ private final Osmformat.DenseNodes dense;
+
+ private final List versions;
+ private final List timestamps;
+ private final List changesets;
+ private final List users;
+ private final IntFunction visibilities;
+ private final IntFunction> keysVals;
+
+ private long id;
+ private long timestamp;
+ private long changeset;
+ private int user;
+ private long lon;
+ private long lat;
+
+ private int next = 0;
+
+ public DenseIterator(Block block, Osmformat.DenseNodes dense) {
+ this.block = block;
+ this.dense = dense;
+ if (!dense.hasDenseinfo()) {
+ throw new OSHDBException("entity info is required for oshdb");
+ }
+
+ var info = dense.getDenseinfo();
+ versions = info.getVersionList();
+ timestamps = info.getTimestampList();
+ changesets = info.getChangesetList();
+ users = info.getUidList();
+ if (!info.getVisibleList().isEmpty()) {
+ visibilities = info.getVisibleList()::get;
+ } else {
+ visibilities = x -> true;
+ }
+
+ if (dense.getKeysValsList().isEmpty()) {
+ keysVals = x -> emptyList();
+ } else {
+ this.keysVals = buildKeyVals(dense)::get;
+ }
+ }
+
+ private List> buildKeyVals(DenseNodes dense) {
+ var list = new ArrayList>(dense.getIdCount());
+ var tags = new ArrayList();
+ var i = 0;
+ while (i < dense.getKeysValsCount()) {
+ var key = dense.getKeysVals(i++);
+ if (key == 0) {
+ block.addToBlockTags(tags);
+ list.add(List.copyOf(tags));
+ tags.clear();
+ } else {
+ var val = dense.getKeysVals(i++);
+ tags.add(new OSHDBTag(key, val));
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next < dense.getIdCount();
+ }
+
+ @Override
+ public OSMEntity next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return getNext(next++);
+ }
+
+ private OSMEntity getNext(int index) {
+ id += dense.getId(index);
+ timestamp += timestamps.get(index);
+ changeset += changesets.get(index);
+ user += users.get(index);
+
+ var visible = TRUE.equals(visibilities.apply(index)) ? 1 : -1;
+ var version = versions.get(index) * visible;
+
+ var tags = keysVals.apply(index);
+ lon += dense.getLon(index);
+ lat += dense.getLat(index);
+ return node(id, version, timestamp, changeset, user, tags, toIntExact(lon), toIntExact(lat));
+ }
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/OSMPbfSource.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/OSMPbfSource.java
new file mode 100644
index 000000000..34ed9d050
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/source/pbf/OSMPbfSource.java
@@ -0,0 +1,67 @@
+package org.heigit.ohsome.oshdb.source.pbf;
+
+import static java.util.stream.Collectors.groupingBy;
+import static reactor.core.publisher.Flux.using;
+import static reactor.core.scheduler.Schedulers.newParallel;
+
+import com.google.common.io.Closeables;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.source.OSMSource;
+import org.heigit.ohsome.oshdb.source.SourceUtil;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.SynchronousSink;
+import reactor.core.scheduler.Scheduler;
+import reactor.util.function.Tuple2;
+
+public class OSMPbfSource implements OSMSource {
+
+ private final Path path;
+
+ public OSMPbfSource(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public Flux>> entities(TagTranslator tagTranslator) {
+ //noinspection UnstableApiUsage
+ return using(this::openSource, source -> entities(source, tagTranslator), Closeables::closeQuietly);
+ }
+
+ private InputStream openSource() throws IOException {
+ return Files.newInputStream(path);
+ }
+
+ private Flux>> entities(InputStream source, TagTranslator tagTranslator) {
+ return Flux.using(() -> newParallel("io"),
+ scheduler -> blobs(source)
+ .filter(Blob::isData)
+ .flatMapSequential(blob -> blob.block().flatMapMany(block -> entities(block, tagTranslator)).subscribeOn(scheduler)),
+ Scheduler::dispose);
+ }
+
+ private Flux>> entities(Block block, TagTranslator tagTranslator) {
+ var entities = block.entities().collect(groupingBy(OSMEntity::getType));
+ return SourceUtil.entities(entities, block.tags(), block.roles(), tagTranslator);
+ }
+
+ private Flux blobs(InputStream source) {
+ return Flux.generate(sink -> readBlob(source, sink));
+ }
+
+ private static void readBlob(InputStream source, SynchronousSink sink) {
+ try {
+ sink.next(Blob.read(source));
+ } catch (EOFException e) {
+ sink.complete();
+ } catch (IOException e) {
+ sink.error(e);
+ }
+ }
+}
diff --git a/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/util/flux/FluxUtil.java b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/util/flux/FluxUtil.java
new file mode 100644
index 000000000..78b7f6ac0
--- /dev/null
+++ b/oshdb-source/src/main/java/org/heigit/ohsome/oshdb/util/flux/FluxUtil.java
@@ -0,0 +1,20 @@
+package org.heigit.ohsome.oshdb.util.flux;
+
+import java.util.Map.Entry;
+import java.util.function.Function;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+public class FluxUtil {
+
+ private FluxUtil() {}
+
+ public static Function, Tuple2> entryToTuple() {
+ return entry -> Tuples.of(entry.getKey(), entry.getValue());
+ }
+
+ public static Function, Tuple2> mapT2(Function fnt) {
+ return tuple -> tuple.mapT2(fnt);
+ }
+
+}
diff --git a/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/osc/OscParserTest.java b/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/osc/OscParserTest.java
new file mode 100644
index 000000000..a21626cff
--- /dev/null
+++ b/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/osc/OscParserTest.java
@@ -0,0 +1,100 @@
+package org.heigit.ohsome.oshdb.source.osc;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.util.Collections;
+import java.util.TreeMap;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.util.tagtranslator.CachedTagTranslator;
+import org.heigit.ohsome.oshdb.util.tagtranslator.MemoryTagTranslator;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+import org.junit.jupiter.api.Test;
+import reactor.util.function.Tuple2;
+
+class OscParserTest {
+
+ private static final String osc = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ """;
+
+
+ @Test
+ void entities() throws Exception {
+ var tagTranslator = new CachedTagTranslator(new MemoryTagTranslator(), 1024);
+
+ try ( var input = new ByteArrayInputStream(osc.getBytes())){
+ var entities = OscParser.entities(input, tagTranslator);
+ var list = entities.flatMap(Tuple2::getT2)
+ .collectList().blockOptional().orElseGet(Collections::emptyList);
+ assertEquals(7, list.size());
+
+ var tagHighwayResidantial = tagTranslator.getOSHDBTagOf(new OSMTag("highway","residential"));
+ System.out.println("tagHighwayResidantial = " + tagHighwayResidantial);
+
+ var roleStreet = tagTranslator.getOSHDBRoleOf("street");
+ System.out.println("roleStreet = " + roleStreet);
+
+ list.forEach(osm -> System.out.printf("%s %s%n", osm, tagTranslator.lookupTag(osm.getTags())));
+ }
+
+ var sortedTags = new TreeMap<>(tagTranslator.getLookupOSHDBTag().asMap());
+ sortedTags.forEach((osm, oshdb) -> System.out.printf("%s -> %s%n", osm, oshdb));
+
+
+ }
+}
\ No newline at end of file
diff --git a/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/osc/ReplicationStateTest.java b/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/osc/ReplicationStateTest.java
new file mode 100644
index 000000000..271cb0737
--- /dev/null
+++ b/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/osc/ReplicationStateTest.java
@@ -0,0 +1,49 @@
+package org.heigit.ohsome.oshdb.source.osc;
+
+import static java.time.Duration.ZERO;
+import static java.time.Duration.ofSeconds;
+import static org.heigit.ohsome.oshdb.source.osc.ReplicationEndpoint.OSM_ORG_MINUTELY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.util.tagtranslator.MemoryTagTranslator;
+import org.junit.jupiter.api.Test;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+class ReplicationStateTest {
+
+ @Test
+ void serverState() throws IOException {
+ var serverState = ReplicationState.getServerState(OSM_ORG_MINUTELY);
+ assertNotNull(serverState);
+ System.out.println("serverState = " + serverState);
+ }
+
+ @Test
+ void state() throws IOException {
+ var state = ReplicationState.getState(OSM_ORG_MINUTELY, 5487541);
+ assertNotNull(state);
+ assertEquals(5487541, state.getSequenceNumber());
+ var tagTranslator = new MemoryTagTranslator();
+ var entities = state.entities(tagTranslator)
+ .concatMap(tuple -> tuple.getT2().count().map(count -> Tuples.of(tuple.getT1(), count)))
+ .collectMap(Tuple2::getT1, Tuple2::getT2)
+ .blockOptional().orElseGet(Collections::emptyMap);
+ assertEquals(30L, entities.getOrDefault(OSMType.NODE, -1L));
+ assertEquals(15L, entities.getOrDefault(OSMType.WAY, -1L));
+ }
+
+ @Test
+ void localState() throws IOException {
+ var localPath = Path.of("../data/replication/").toAbsolutePath();
+ var endpoint = new ReplicationEndpoint(localPath.toUri().toString(), ofSeconds(1), ZERO);
+ var state = endpoint.serverState();
+ assertNotNull(state);
+ }
+
+}
\ No newline at end of file
diff --git a/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/pbf/OSMPbfSourceTest.java b/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/pbf/OSMPbfSourceTest.java
new file mode 100644
index 000000000..3e15ed4ff
--- /dev/null
+++ b/oshdb-source/src/test/java/org/heigit/ohsome/oshdb/source/pbf/OSMPbfSourceTest.java
@@ -0,0 +1,32 @@
+package org.heigit.ohsome.oshdb.source.pbf;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.util.tagtranslator.MemoryTagTranslator;
+import org.junit.jupiter.api.Test;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+class OSMPbfSourceTest {
+
+ private static final Path SAMPLE_PBF_PATH = Path.of("../data/sample.pbf");
+
+ @Test
+ void entities() {
+ var tagTranslator = new MemoryTagTranslator();
+ var source = new OSMPbfSource(SAMPLE_PBF_PATH);
+ var map = source.entities(tagTranslator)
+ .concatMap(tuple -> tuple.getT2().count().map(count -> Tuples.of(tuple.getT1(), count)))
+ .windowUntilChanged(Tuple2::getT1)
+ .concatMap(wnd -> wnd.reduce((t1, t2) -> Tuples.of(t1.getT1(), t1.getT2() + t2.getT2())))
+ .collectMap(Tuple2::getT1, Tuple2::getT2)
+ .blockOptional().orElseGet(Collections::emptyMap);
+ assertEquals(290, map.getOrDefault(OSMType.NODE, 0L));
+ assertEquals(44, map.getOrDefault(OSMType.WAY, 0L));
+ assertEquals(5, map.getOrDefault(OSMType.RELATION, 0L));
+
+ }
+}
\ No newline at end of file
diff --git a/oshdb-source/src/test/resources/sample.osc b/oshdb-source/src/test/resources/sample.osc
new file mode 100644
index 000000000..ae6a0c2be
--- /dev/null
+++ b/oshdb-source/src/test/resources/sample.osc
@@ -0,0 +1,88 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/oshdb-store/pom.xml b/oshdb-store/pom.xml
new file mode 100644
index 000000000..d74125c1b
--- /dev/null
+++ b/oshdb-store/pom.xml
@@ -0,0 +1,29 @@
+
+
+ 4.0.0
+
+ org.heigit.ohsome
+ oshdb-parent
+ 1.2.0-SNAPSHOT
+
+
+ oshdb-store
+ OSHDB Store Module
+
+
+
+
+ ${project.groupId}
+ oshdb-util
+ ${project.version}
+
+
+
+ ${project.groupId}
+ oshdb-source
+ ${project.version}
+
+
+
\ No newline at end of file
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/BackRef.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/BackRef.java
new file mode 100644
index 000000000..182770d45
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/BackRef.java
@@ -0,0 +1,41 @@
+package org.heigit.ohsome.oshdb.store;
+
+import static java.util.Collections.emptySet;
+
+import java.util.Set;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+
+public class BackRef {
+ private final OSMType type;
+ private final long id;
+
+ private final Set ways;
+ private final Set relations;
+
+ public BackRef(OSMType type, long id, Set relations) {
+ this(type, id, emptySet(), relations);
+ }
+
+ public BackRef(OSMType type, long id, Set ways, Set relations) {
+ this.type = type;
+ this.id = id;
+ this.ways = ways;
+ this.relations = relations;
+ }
+
+ public OSMType getType() {
+ return type;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public Set ways(){
+ return ways;
+ }
+
+ public Set relations(){
+ return relations;
+ }
+}
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/BackRefType.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/BackRefType.java
new file mode 100644
index 000000000..a798b783f
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/BackRefType.java
@@ -0,0 +1,35 @@
+package org.heigit.ohsome.oshdb.store;
+
+import static org.heigit.ohsome.oshdb.osm.OSMType.NODE;
+import static org.heigit.ohsome.oshdb.osm.OSMType.RELATION;
+import static org.heigit.ohsome.oshdb.osm.OSMType.WAY;
+
+import org.heigit.ohsome.oshdb.osm.OSMType;
+
+public enum BackRefType {
+ NODE_WAY(NODE, WAY),
+ NODE_RELATION(NODE, RELATION),
+ WAY_RELATION(WAY, RELATION),
+ RELATION_RELATION(RELATION, RELATION);
+
+ private final OSMType type;
+ private final OSMType backRef;
+
+ BackRefType(OSMType type, OSMType backRef) {
+ this.type = type;
+ this.backRef = backRef;
+ }
+
+ public OSMType getType() {
+ return type;
+ }
+
+ public OSMType getBackRef() {
+ return backRef;
+ }
+
+ @Override
+ public String toString() {
+ return name().toLowerCase();
+ }
+}
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/OSHDBStore.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/OSHDBStore.java
new file mode 100644
index 000000000..80c4d7e77
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/OSHDBStore.java
@@ -0,0 +1,48 @@
+package org.heigit.ohsome.oshdb.store;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.source.ReplicationInfo;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+
+public interface OSHDBStore extends AutoCloseable {
+
+ /**
+ * Get current Replication Info from store.
+ * @return current state
+ */
+ ReplicationInfo state();
+
+ /**
+ * Update Replication Info
+ * @param state new Replication Info
+ */
+ void state(ReplicationInfo state);
+
+ TagTranslator getTagTranslator();
+
+ default OSHData entity(OSMType type, long id) {
+ return entities(type, Set.of(id)).get(id);
+ }
+
+ Map entities(OSMType type, Set ids);
+
+ void entities(Set entities);
+
+ List grid(OSMType type, Long cellId);
+
+ default BackRef backRef(OSMType type, long id) {
+ return backRefs(type, Set.of(id)).get(id);
+ }
+
+ Map backRefs(OSMType type, Set ids);
+
+ void backRefsMerge(BackRefType type, long backRef, Set ids);
+
+ Collection dirtyGrids(OSMType type);
+
+ void resetDirtyGrids(OSMType type);
+}
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/OSHData.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/OSHData.java
new file mode 100644
index 000000000..974461b8f
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/OSHData.java
@@ -0,0 +1,69 @@
+package org.heigit.ohsome.oshdb.store;
+
+import java.io.Serializable;
+import org.heigit.ohsome.oshdb.impl.osh.OSHNodeImpl;
+import org.heigit.ohsome.oshdb.impl.osh.OSHRelationImpl;
+import org.heigit.ohsome.oshdb.impl.osh.OSHWayImpl;
+import org.heigit.ohsome.oshdb.osh.OSHEntity;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+
+public class OSHData implements Serializable {
+
+ private final OSMType type;
+ private final long id;
+
+ private final long gridId;
+ private final byte[] data;
+
+ private transient OSHEntity osh;
+
+ public OSHData(OSMType type, long id, long gridId, byte[] data) {
+ this.type = type;
+ this.id = id;
+ this.gridId = gridId;
+ this.data = data;
+ }
+
+ public OSMType getType() {
+ return type;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getGridId() {
+ return gridId;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T getOSHEntity() {
+ if (osh == null) {
+ osh = oshEntity();
+ }
+ return (T) osh;
+ }
+
+ private OSHEntity oshEntity() {
+ return switch (type) {
+ case NODE -> OSHNodeImpl.instance(data, 0, data.length);
+ case WAY -> OSHWayImpl.instance(data, 0, data.length);
+ case RELATION -> OSHRelationImpl.instance(data, 0, data.length);
+ };
+ }
+
+ @Override
+ public String toString() {
+ return "OSHData{" +
+ "type=" + type +
+ ", id=" + id +
+ ", gridId=" + gridId +
+ ", data=" + data.length +
+ ", osh=" + osh +
+ '}';
+ }
+}
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/memory/MemoryStore.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/memory/MemoryStore.java
new file mode 100644
index 000000000..bf88f353d
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/memory/MemoryStore.java
@@ -0,0 +1,128 @@
+package org.heigit.ohsome.oshdb.store.memory;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.heigit.ohsome.oshdb.store.BackRefType.NODE_RELATION;
+import static org.heigit.ohsome.oshdb.store.BackRefType.NODE_WAY;
+import static org.heigit.ohsome.oshdb.store.BackRefType.RELATION_RELATION;
+import static org.heigit.ohsome.oshdb.store.BackRefType.WAY_RELATION;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.source.ReplicationInfo;
+import org.heigit.ohsome.oshdb.store.BackRef;
+import org.heigit.ohsome.oshdb.store.BackRefType;
+import org.heigit.ohsome.oshdb.store.OSHDBStore;
+import org.heigit.ohsome.oshdb.store.OSHData;
+import org.heigit.ohsome.oshdb.util.tagtranslator.MemoryTagTranslator;
+import org.heigit.ohsome.oshdb.util.tagtranslator.TagTranslator;
+
+public class MemoryStore implements OSHDBStore {
+
+ private final MemoryTagTranslator tagTranslator = new MemoryTagTranslator();
+ private final Map> entityStore = new EnumMap<>(OSMType.class);
+ private final Map> dirtyGrids = new EnumMap<>(OSMType.class);
+ private final Map>> backRefStore = new EnumMap<>(BackRefType.class);
+
+ private ReplicationInfo state;
+
+ public MemoryStore(ReplicationInfo state) {
+ this.state = state;
+ }
+
+ @Override
+ public ReplicationInfo state() {
+ return state;
+ }
+
+ @Override
+ public void state(ReplicationInfo state) {
+ this.state = state;
+ }
+
+ @Override
+ public TagTranslator getTagTranslator() {
+ return tagTranslator;
+ }
+
+ @Override
+ public Map entities(OSMType type, Set ids) {
+ var entities = entityStore.getOrDefault(type, emptyMap());
+ return ids.stream().map(entities::get)
+ .filter(Objects::nonNull)
+ .collect(toMap(OSHData::getId, identity()));
+ }
+
+ @Override
+ public void entities(Set entities) {
+ entities.forEach(data -> entityStore
+ .computeIfAbsent(data.getType(), x -> new TreeMap<>())
+ .put(data.getId(), data));
+ entities.forEach(data -> dirtyGrids.computeIfAbsent(data.getType(), x -> new TreeSet<>())
+ .add(data.getGridId()));
+ }
+
+ @Override
+ public List grid(OSMType type, Long gridId) {
+ return entityStore.getOrDefault(type, emptyMap())
+ .values()
+ .stream()
+ .filter(data -> data.getGridId() == gridId)
+ .toList();
+ }
+
+ @Override
+ public Collection dirtyGrids(OSMType type) {
+ return dirtyGrids.getOrDefault(type, emptySet());
+ }
+
+ @Override
+ public void resetDirtyGrids(OSMType type) {
+ dirtyGrids.getOrDefault(type, emptySet()).clear();
+ }
+
+ @Override
+ public Map backRefs(OSMType type, Set ids) {
+ return ids.stream().map(id -> backRef(type, id))
+ .collect(toMap(BackRef::getId, identity()));
+ }
+
+ @Override
+ public BackRef backRef(OSMType type, long id) {
+ var ways = Collections.emptySet();
+ var relations = Collections.emptySet();
+ if (Objects.requireNonNull(type) == OSMType.NODE) {
+ ways = backRefStore.getOrDefault(NODE_WAY, emptyMap()).getOrDefault(id, emptySet());
+ relations = backRefStore.getOrDefault(NODE_RELATION, emptyMap())
+ .getOrDefault(id, emptySet());
+ } else if (type == OSMType.WAY) {
+ relations = backRefStore.getOrDefault(WAY_RELATION, emptyMap()).getOrDefault(id, emptySet());
+ } else if (type == OSMType.RELATION) {
+ relations = backRefStore.getOrDefault(RELATION_RELATION, emptyMap())
+ .getOrDefault(id, emptySet());
+ }
+ return new BackRef(type, id, ways, relations);
+ }
+
+ @Override
+ public void backRefsMerge(BackRefType type, long backRef, Set ids) {
+ ids.forEach(id -> backRefStore.computeIfAbsent(type, x -> new TreeMap<>())
+ .computeIfAbsent(id, x -> new TreeSet<>())
+ .add(backRef));
+ }
+
+ @Override
+ public void close() {
+ // no/op
+ }
+}
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/GridUpdater.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/GridUpdater.java
new file mode 100644
index 000000000..6c7c7929e
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/GridUpdater.java
@@ -0,0 +1,12 @@
+package org.heigit.ohsome.oshdb.store.update;
+
+import org.heigit.ohsome.oshdb.grid.GridOSHEntity;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.util.CellId;
+
+@FunctionalInterface
+public interface GridUpdater {
+
+ void update(OSMType type, CellId cellId, GridOSHEntity grid);
+
+}
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/OSHDBUpdater.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/OSHDBUpdater.java
new file mode 100644
index 000000000..e62a88849
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/OSHDBUpdater.java
@@ -0,0 +1,176 @@
+package org.heigit.ohsome.oshdb.store.update;
+
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.heigit.ohsome.oshdb.osm.OSMType.NODE;
+import static org.heigit.ohsome.oshdb.osm.OSMType.RELATION;
+import static org.heigit.ohsome.oshdb.osm.OSMType.WAY;
+import static reactor.core.publisher.Flux.concat;
+import static reactor.core.publisher.Flux.defer;
+
+import com.google.common.collect.Maps;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.heigit.ohsome.oshdb.grid.GridOSHEntity;
+import org.heigit.ohsome.oshdb.grid.GridOSHNodes;
+import org.heigit.ohsome.oshdb.grid.GridOSHRelations;
+import org.heigit.ohsome.oshdb.grid.GridOSHWays;
+import org.heigit.ohsome.oshdb.osh.OSHEntity;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMNode;
+import org.heigit.ohsome.oshdb.osm.OSMRelation;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.osm.OSMWay;
+import org.heigit.ohsome.oshdb.store.OSHDBStore;
+import org.heigit.ohsome.oshdb.store.OSHData;
+import org.heigit.ohsome.oshdb.util.CellId;
+import org.heigit.ohsome.oshdb.util.exceptions.OSHDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.function.TupleUtils;
+import reactor.util.function.Tuple2;
+
+public class OSHDBUpdater {
+
+ private static final Logger log = LoggerFactory.getLogger(OSHDBUpdater.class);
+
+ private final OSHDBStore store;
+ private final GridUpdater gridUpdater;
+ private final boolean optimize;
+
+
+ private final Map> minorUpdates = new EnumMap<>(OSMType.class);
+ private final Map> updatedEntities = new EnumMap<>(OSMType.class);
+
+ public OSHDBUpdater(OSHDBStore store, GridUpdater gridUpdater, boolean optimize) {
+ this.store = store;
+ this.gridUpdater = gridUpdater;
+ this.optimize = optimize;
+ }
+
+ public Flux updateEntities(Flux>> entities) {
+ return concat(
+ entities.concatMap(TupleUtils.function(this::entities)),
+ defer(this::minorWays),
+ defer(this::minorRelations));
+ }
+
+ private Flux minorWays() {
+ var ids = minorUpdates.getOrDefault(WAY, emptySet());
+ return ways(ids.stream().collect(toMap(identity(), x -> emptyList())));
+ }
+
+ private Flux minorRelations() {
+ var ids = minorUpdates.getOrDefault(RELATION, emptySet());
+ return relations(ids.stream().collect(toMap(identity(), x -> emptyList())));
+ }
+
+ public Flux entities(OSMType type, Flux entities) {
+ return switch (type) {
+ case NODE -> nodes(entities.cast(OSMNode.class));
+ case WAY -> ways(entities.cast(OSMWay.class));
+ case RELATION -> relations(entities.cast(OSMRelation.class));
+ };
+ }
+
+ private Flux nodes(Flux entities) {
+ return entities.collectMultimap(OSMEntity::getId)
+ .flatMapMany(this::nodes);
+ }
+
+ private Flux nodes(Map> entities) {
+ return Flux.using(() -> new Updates(store, minorUpdates, updatedEntities),
+ updates -> updates.nodes(entities),
+ updates -> updateGrid(NODE));
+ }
+
+ private Flux ways(Flux entities) {
+ return entities.collectMultimap(OSMEntity::getId)
+ .flatMapMany(this::ways);
+ }
+
+ private Flux ways(Map> entities) {
+ return Flux.using(() -> new Updates(store, minorUpdates, updatedEntities),
+ updates -> updates.ways(mergeWithMinorUpdates(WAY, entities)),
+ updates -> updateGrid(WAY));
+ }
+
+ private Flux relations(Flux entities) {
+ return concat(
+ defer(this::minorWays), // minor ways could trigger minor relations
+ entities.collectMultimap(OSMEntity::getId).flatMapMany(this::relations));
+ }
+
+ private Flux relations(Map> entities) {
+ return Flux.using(() -> new Updates(store, minorUpdates, updatedEntities),
+ updates -> updates.relations(mergeWithMinorUpdates(RELATION, entities)),
+ updates -> updateGrid(RELATION));
+ }
+
+ private Map> mergeWithMinorUpdates(OSMType type, Map> entities) {
+ var minorIds = minorUpdates.getOrDefault(type, emptySet());
+ var result = Maps.>newHashMapWithExpectedSize(entities.size() + minorIds.size());
+ result.putAll(entities);
+ minorIds.forEach(id -> result.computeIfAbsent(id, x -> emptyList()));
+ return result;
+ }
+
+ private void updateGrid(OSMType type) {
+ var cellIds = store.dirtyGrids(type);
+
+ //TODO optimize grid!!!
+
+ log.debug("updateGrid {} cells:{}", type, cellIds.size());
+ for (var id : cellIds) {
+ var cellId = CellId.fromLevelId(id);
+ var entities = store.grid(type, id);
+ var grid = buildGrid(type, cellId, entities);
+ gridUpdater.update(type, cellId, grid);
+ }
+ store.resetDirtyGrids(type);
+ }
+
+ private GridOSHEntity buildGrid(OSMType type, CellId cellId, List entities) {
+ if (entities.isEmpty()) {
+ return null;
+ }
+ var index = new int[entities.size()];
+ var offset = 0;
+ var i = 0;
+ for (var data : entities) {
+ index[i++] = offset;
+ offset += data.getData().length;
+ }
+ i = 0;
+ var data = new byte[offset];
+ for (var oshData : entities) {
+ var len = oshData.getData().length;
+ System.arraycopy(oshData.getData(),0, data, index[i++], len);
+ }
+ return switch (type) {
+ case NODE -> grid(GridOSHNodes.class, cellId.getId(), cellId.getZoomLevel(), index, data);
+ case WAY -> grid(GridOSHWays.class, cellId.getId(), cellId.getZoomLevel(), index, data);
+ case RELATION -> grid(GridOSHRelations.class, cellId.getId(), cellId.getZoomLevel(), index, data);
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ private static T grid(Class clazz, long id, int zoom, int[] index, byte[] data) {
+ var constructor = clazz.getDeclaredConstructors()[0];
+ constructor.setAccessible(true);
+ try {
+ return (T) constructor.newInstance(id, zoom, 0L, 0L, 0, 0, index, data);
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+}
diff --git a/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/Updates.java b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/Updates.java
new file mode 100644
index 000000000..14caaf591
--- /dev/null
+++ b/oshdb-store/src/main/java/org/heigit/ohsome/oshdb/store/update/Updates.java
@@ -0,0 +1,273 @@
+package org.heigit.ohsome.oshdb.store.update;
+
+import static java.util.Collections.emptyList;
+import static java.util.function.Predicate.not;
+import static org.heigit.ohsome.oshdb.osm.OSMType.NODE;
+import static org.heigit.ohsome.oshdb.osm.OSMType.RELATION;
+import static org.heigit.ohsome.oshdb.osm.OSMType.WAY;
+import static org.heigit.ohsome.oshdb.store.BackRefType.NODE_RELATION;
+import static org.heigit.ohsome.oshdb.store.BackRefType.NODE_WAY;
+import static org.heigit.ohsome.oshdb.store.BackRefType.RELATION_RELATION;
+import static org.heigit.ohsome.oshdb.store.BackRefType.WAY_RELATION;
+import static reactor.core.publisher.Mono.fromCallable;
+
+import com.google.common.collect.Streams;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Function;
+import org.heigit.ohsome.oshdb.OSHDB;
+import org.heigit.ohsome.oshdb.impl.osh.OSHEntityImpl;
+import org.heigit.ohsome.oshdb.impl.osh.OSHNodeImpl;
+import org.heigit.ohsome.oshdb.impl.osh.OSHRelationImpl;
+import org.heigit.ohsome.oshdb.impl.osh.OSHWayImpl;
+import org.heigit.ohsome.oshdb.index.XYGridTree;
+import org.heigit.ohsome.oshdb.osh.OSHEntity;
+import org.heigit.ohsome.oshdb.osh.OSHNode;
+import org.heigit.ohsome.oshdb.osh.OSHRelation;
+import org.heigit.ohsome.oshdb.osh.OSHWay;
+import org.heigit.ohsome.oshdb.osm.OSM;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.osm.OSMNode;
+import org.heigit.ohsome.oshdb.osm.OSMRelation;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.osm.OSMWay;
+import org.heigit.ohsome.oshdb.store.BackRef;
+import org.heigit.ohsome.oshdb.store.OSHDBStore;
+import org.heigit.ohsome.oshdb.store.OSHData;
+import org.heigit.ohsome.oshdb.util.CellId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class Updates {
+ private static final Logger log = LoggerFactory.getLogger(Updates.class);
+
+ private static final XYGridTree gridIndex = new XYGridTree(OSHDB.MAXZOOM);
+ private static final CellId ZERO = CellId.fromLevelId(0);
+
+ private final Set gridUpdates = new HashSet<>();
+
+ private final OSHDBStore store;
+ private final Map> minorUpdates;
+ private final Map> updatedEntities;
+
+ public Updates(OSHDBStore store, Map> minorUpdates,
+ Map> updatedEntities) {
+ this.store = store;
+ this.minorUpdates = minorUpdates;
+ this.updatedEntities = updatedEntities;
+ }
+
+ public Set getGridUpdates(){
+ return gridUpdates;
+ }
+
+ public Flux nodes(Map> entities){
+ var dataMap = store.entities(NODE, entities.keySet());
+ var backRefMap = store.backRefs(NODE, entities.keySet());
+ return Flux.fromIterable(entities.entrySet())
+ .concatMap(entry -> Mono.fromCallable(() -> node(dataMap, backRefMap, entry)));
+ }
+
+ private OSHEntity node(Map dataMap, Map backRefMap,
+ Entry> entry) {
+ var id = entry.getKey();
+ var versions = entry.getValue();
+ return node(id, versions, dataMap.get(id), backRefMap.get(id));
+ }
+
+ private OSHEntity node(long id, Collection newVersions, OSHData data, BackRef backRef) {
+ var versions = new HashSet();
+ if (data != null) {
+ OSHNode osh = data.getOSHEntity();
+ osh.getVersions().forEach(versions::add);
+ }
+ var isMajorUpdate = versions.addAll(newVersions);
+ if (!isMajorUpdate) {
+ return null; // no updates
+ }
+
+ var osh = OSHNodeImpl.build(new ArrayList<>(versions));
+ updateStore(id, data, backRef, osh);
+ return osh;
+ }
+
+ public Flux ways(Map> entities){
+ var dataMap = store.entities(WAY, entities.keySet());
+ var backRefMap = store.backRefs(WAY, entities.keySet());
+ return Flux.fromIterable(entities.entrySet())
+ .concatMap(entry -> fromCallable(() -> way(dataMap, backRefMap, entry)));
+ }
+
+ private OSHEntity way(Map dataMap, Map backRefMap,
+ Entry> entry) {
+ var id = entry.getKey();
+ var versions = entry.getValue();
+ return way(id, versions, dataMap.get(id), backRefMap.get(id));
+ }
+
+ private OSHEntity way(long id, Collection newVersions, OSHData data, BackRef backRef) {
+ var versions = new HashSet();
+ var members = new TreeMap();
+ if (data != null) {
+ OSHWay osh = data.getOSHEntity();
+ osh.getVersions().forEach(versions::add);
+ osh.getNodes().forEach(node -> members.put(node.getId(), node));
+ }
+ var updatedNodes = updatedEntities.get(NODE);
+ var updatedMembers = new TreeSet();
+ members.keySet().stream().filter(updatedNodes::contains).forEach(updatedMembers::add);
+
+ var isMajorUpdate = versions.addAll(newVersions);
+ var isMinorUpdate = !updatedMembers.isEmpty();
+ if (!isMinorUpdate && !isMajorUpdate) {
+ return null; // no updates
+ }
+
+ var newMembers = newMembers(newVersions, OSMWay::getMembers, NODE, members);
+ updatedMembers.addAll(newMembers);
+ updateMembers(NODE, updatedMembers, members);
+ store.backRefsMerge(NODE_WAY, id, newMembers);
+
+ var osh = OSHWayImpl.build(new ArrayList<>(versions), members.values());
+ updateStore(id, data, backRef, osh);
+ return osh;
+ }
+
+ public Flux relations(Map> entities){
+ var dataMap = store.entities(RELATION, entities.keySet());
+ var backRefMap = store.backRefs(RELATION, entities.keySet());
+ return Flux.fromIterable(entities.entrySet())
+ .concatMap(entry -> fromCallable(() -> relation(dataMap, backRefMap, entry)));
+ }
+
+ private OSHEntity relation(Map dataMap, Map backRefMap,
+ Entry> entry) {
+ var id = entry.getKey();
+ var versions = entry.getValue();
+ return relation(id, versions, dataMap.get(id), backRefMap.get(id));
+ }
+
+ private static final OSHRelation DUMMY = OSHRelationImpl.build(
+ new ArrayList<>(List.of(OSM.relation(0,0,0,0,0, new int[0], new OSMMember[0]))),
+ emptyList(), emptyList());
+
+ private OSHEntity relation(long id, Collection newVersions, OSHData data, BackRef backRef) {
+ var versions = new HashSet();
+ var nodeMembers = new TreeMap();
+ var wayMembers = new TreeMap();
+ var relationMembers = new TreeMap();
+
+ if (data != null) {
+ OSHRelation osh = data.getOSHEntity();
+ osh.getVersions().forEach(versions::add);
+ osh.getNodes().forEach(node -> nodeMembers.put(node.getId(), node));
+ osh.getWays().forEach(way -> wayMembers.put(way.getId(), way));
+
+ Streams.stream(osh.getVersions())
+ .flatMap(version -> Arrays.stream(version.getMembers()))
+ .filter(member -> member.getType() == RELATION)
+ .forEach(member -> relationMembers.put(member.getId(), DUMMY));
+ }
+ var updatedNodes = updatedEntities.get(NODE);
+ var updatedNodeMembers = new TreeSet();
+ nodeMembers.keySet().stream().filter(updatedNodes::contains).forEach(updatedNodeMembers::add);
+
+ var updatedWays = updatedEntities.get(WAY);
+ var updatedWayMembers = new TreeSet();
+ wayMembers.keySet().stream().filter(updatedWays::contains).forEach(updatedWayMembers::add);
+
+ var isMajorUpdate = versions.addAll(newVersions);
+ var isMinorUpdate = !updatedNodeMembers.isEmpty() || !updatedWays.isEmpty();
+ if (!isMinorUpdate && !isMajorUpdate) {
+ return null; // no updates
+ }
+
+ var newNodeMembers = newMembers(newVersions, OSMRelation::getMembers, NODE, nodeMembers);
+ store.backRefsMerge(NODE_RELATION, id, newNodeMembers);
+ updatedNodeMembers.addAll(newNodeMembers);
+ updateMembers(NODE,updatedNodeMembers, nodeMembers);
+
+ var newWayMembers = newMembers(newVersions, OSMRelation::getMembers, WAY, wayMembers);
+ store.backRefsMerge(WAY_RELATION, id, newWayMembers);
+ updatedWayMembers.addAll(newWayMembers);
+ updateMembers(WAY, updatedWayMembers, wayMembers);
+
+ var newRelationMembers = newMembers(newVersions, OSMRelation::getMembers, RELATION, relationMembers);
+ store.backRefsMerge(RELATION_RELATION, id, newRelationMembers);
+
+ var osh = OSHRelationImpl.build(new ArrayList<>(versions), nodeMembers.values(), wayMembers.values());
+ updateStore(id, data, backRef, osh);
+ return osh;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void updateMembers(OSMType type, Set membersToUpdate, Map members) {
+ store.entities(type, membersToUpdate).values().stream()
+ .map(member -> (T) member.getOSHEntity())
+ .filter(Objects::nonNull)
+ .forEach(way -> members.put(way.getId(), way));
+ }
+
+ private Set newMembers(Collection versions,Function fnt, OSMType type, Map members) {
+ var newMembers = new TreeSet();
+ versions.stream()
+ .map(fnt)
+ .flatMap(Arrays::stream)
+ .filter(member -> member.getType() == type)
+ .map(OSMMember::getId)
+ .filter(not(members::containsKey))
+ .forEach(newMembers::add);
+ return newMembers;
+ }
+
+ private void updateStore(long id, OSHData data, BackRef backRef, OSHEntityImpl osh) {
+ var cellId = gridIndex(osh);
+ if (data != null) {
+ var prevCellId = CellId.fromLevelId(data.getGridId());
+ if (prevCellId.getZoomLevel() > 0 && prevCellId.getZoomLevel() < cellId.getZoomLevel()) {
+ // keep entity in lower zoomlevel (
+ cellId = prevCellId;
+ } else if (prevCellId.getZoomLevel() > 0) {
+ gridUpdates.add(prevCellId);
+ }
+ }
+
+ forwardBackRefs(backRef);
+ gridUpdates.add(cellId);
+ updatedEntities(osh);
+
+ var updatedData = new OSHData(osh.getType(), id, cellId.getLevelId(), osh.getData());
+ store.entities(Set.of(updatedData));
+ }
+
+ private void forwardBackRefs(BackRef backRefs) {
+ if (backRefs != null) {
+ backRefs.ways().forEach(backRef -> minorUpdates.computeIfAbsent(WAY, x -> new HashSet<>()).add(backRef));
+ backRefs.relations().forEach(backRef -> minorUpdates.computeIfAbsent(RELATION, x -> new HashSet<>()).add(backRef));
+ }
+ }
+
+ private CellId gridIndex(OSHEntity osh) {
+ var bbox = osh.getBoundable().getBoundingBox();
+ if (!bbox.isValid()) {
+ return ZERO;
+ }
+ return gridIndex.getInsertId(bbox);
+ }
+
+ private void updatedEntities(OSHEntity osh) {
+ updatedEntities.computeIfAbsent(osh.getType(), x -> new HashSet<>()).add(osh.getId());
+ }
+}
diff --git a/oshdb-store/src/test/java/org/heigit/ohsome/oshdb/store/update/OSHDBUpdaterTest.java b/oshdb-store/src/test/java/org/heigit/ohsome/oshdb/store/update/OSHDBUpdaterTest.java
new file mode 100644
index 000000000..2c06fa025
--- /dev/null
+++ b/oshdb-store/src/test/java/org/heigit/ohsome/oshdb/store/update/OSHDBUpdaterTest.java
@@ -0,0 +1,63 @@
+package org.heigit.ohsome.oshdb.store.update;
+
+import static org.heigit.ohsome.oshdb.osm.OSM.node;
+import static org.heigit.ohsome.oshdb.osm.OSM.way;
+import static org.heigit.ohsome.oshdb.osm.OSMType.NODE;
+import static org.heigit.ohsome.oshdb.osm.OSMType.WAY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static reactor.core.publisher.Flux.just;
+
+import com.google.common.collect.Iterables;
+import java.util.List;
+import java.util.Set;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.source.ReplicationInfo;
+import org.heigit.ohsome.oshdb.store.memory.MemoryStore;
+import org.junit.jupiter.api.Test;
+import reactor.util.function.Tuples;
+
+class OSHDBUpdaterTest {
+
+ @Test
+ void entities() {
+ ReplicationInfo state = null;
+ try (var store = new MemoryStore(state)) {
+ var updater = new OSHDBUpdater(store, (type, cellId, grid) -> {}, false);
+
+ var count = updater.updateEntities(just(Tuples.of(NODE, just(
+ node(1L, 1, 1000, 100, 1, List.of(), 0, 0)))))
+ .count().block();
+ assertEquals(1L, count);
+ var nodes = store.entities(NODE, Set.of(1L));
+ assertEquals(1, nodes.size());
+ var node = nodes.get(1L);
+ assertNotNull(node);
+ System.out.println(node.getOSHEntity().toString());
+
+ updater.updateEntities(just(Tuples.of(WAY, just(
+ way(1,1,2000, 200, 1, List.of(), new OSMMember[]{
+ new OSMMember(1, NODE,-1)})))))
+ .count().block();
+ assertEquals(1L, count);
+
+ var ways = store.entities(WAY, Set.of(1L));
+ assertEquals(1, ways.size());
+ var way = ways.get(1L);
+ assertNotNull(way);
+
+ count = updater.updateEntities(just(Tuples.of(NODE, just(
+ node(1L, 2, 2000, 200, 2, List.of(), 10, 10)))))
+ .count().block();
+ assertEquals(2L, count); // major node, minor way
+
+ nodes = store.entities(NODE, Set.of(1L));
+ assertEquals(1, nodes.size());
+ node = nodes.get(1L);
+ assertNotNull(node);
+ assertEquals(2, Iterables.size(node.getOSHEntity().getVersions()));
+
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/oshdb-tool/pom.xml b/oshdb-tool/pom.xml
index 2d44fd00e..261c492be 100644
--- a/oshdb-tool/pom.xml
+++ b/oshdb-tool/pom.xml
@@ -11,6 +11,8 @@
Toolkit for the OSHDB
+ 4.7.1
+ 0.9.5
@@ -19,5 +21,30 @@
oshdb-api-ignite
${project.version}
+
+
+ ${project.groupId}
+ oshdb-source
+ ${project.version}
+
+
+
+ ${project.groupId}
+ oshdb-rocksdb
+ ${project.version}
+
+
+
+ info.picocli
+ picocli
+ ${picocli.version}
+
+
+
+ me.tongfei
+ progressbar
+ ${progressbar.version}
+
+
\ No newline at end of file
diff --git a/oshdb-tool/src/main/java/org/heigit/ohsome/oshdb/tools/OSHDBTool.java b/oshdb-tool/src/main/java/org/heigit/ohsome/oshdb/tools/OSHDBTool.java
new file mode 100644
index 000000000..317a29ece
--- /dev/null
+++ b/oshdb-tool/src/main/java/org/heigit/ohsome/oshdb/tools/OSHDBTool.java
@@ -0,0 +1,26 @@
+package org.heigit.ohsome.oshdb.tools;
+
+import org.heigit.ohsome.oshdb.tools.update.UpdateCommand;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.ScopeType;
+
+@Command(name = "oshdb-tool", description = "oshdb-tool",
+ mixinStandardHelpOptions = true, scope = ScopeType.INHERIT,
+ subcommands = {
+ UpdateCommand.class
+ },
+ footerHeading = "Copyright%n", footer = "(c) Copyright by the authors")
+public class OSHDBTool {
+
+ @Option(names = {"-v"}, scope = ScopeType.INHERIT)
+ boolean[] verbose;
+
+ public static void main(String[] args) {
+ var main = new OSHDBTool();
+ var cli = new CommandLine(main);
+ var exit = cli.execute(args);
+ System.exit(exit);
+ }
+}
diff --git a/oshdb-tool/src/main/java/org/heigit/ohsome/oshdb/tools/update/UpdateCommand.java b/oshdb-tool/src/main/java/org/heigit/ohsome/oshdb/tools/update/UpdateCommand.java
new file mode 100644
index 000000000..2905d6644
--- /dev/null
+++ b/oshdb-tool/src/main/java/org/heigit/ohsome/oshdb/tools/update/UpdateCommand.java
@@ -0,0 +1,95 @@
+package org.heigit.ohsome.oshdb.tools.update;
+
+import static reactor.core.publisher.Flux.concat;
+import static reactor.core.publisher.Flux.just;
+import static reactor.core.publisher.Flux.range;
+import static reactor.core.publisher.Mono.fromCallable;
+
+import java.nio.file.Path;
+import java.util.concurrent.Callable;
+import org.heigit.ohsome.oshdb.source.osc.ReplicationEndpoint;
+import org.heigit.ohsome.oshdb.source.osc.ReplicationState;
+import org.heigit.ohsome.oshdb.store.OSHDBStore;
+import org.heigit.ohsome.oshdb.store.update.OSHDBUpdater;
+import org.heigit.ohsome.oshdb.util.tagtranslator.CachedTagTranslator;
+import org.reactivestreams.Publisher;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import reactor.core.publisher.Flux;
+
+@Command(name = "update")
+public class UpdateCommand implements Callable {
+ private static final Logger log = LoggerFactory.getLogger(UpdateCommand.class);
+
+ @Option(names = {"--path"}, required = true)
+ Path path;
+
+ @Override
+ public Integer call() throws Exception {
+ try (var store = openStore()) {
+ var tagTranslator = new CachedTagTranslator(store.getTagTranslator(), 10 * SizeUnit.MB);
+
+ Flux.range(0, Integer.MAX_VALUE)
+ .concatMap(x -> states(store))
+ .concatMap(state -> update(store, tagTranslator, state));
+
+ var currentState = ReplicationEndpoint.stateFromInfo(store.state());
+ var serverState = currentState.serverState();
+
+ var startSequence = currentState.getSequenceNumber() + 1;
+ var endSequence = serverState.getSequenceNumber();
+ var states = concat(range(startSequence, endSequence - startSequence)
+ .concatMap(sequence -> fromCallable(() -> serverState.state(sequence))),
+ just(serverState));
+ states.concatMap(state ->
+ state.entities(tagTranslator)
+
+ );
+
+ return 0;
+ }
+ }
+
+ private Publisher> update(OSHDBStore store, CachedTagTranslator tagTranslator, ReplicationState state) {
+ var updater = new OSHDBUpdater(store, (type, id, grid) -> {}, true);
+ updater.updateEntities(state.entities(tagTranslator));
+ store.state(state);
+ throw new UnsupportedOperationException();
+ }
+
+
+ private OSHDBStore openStore() {
+ throw new UnsupportedOperationException();
+ }
+
+ private Flux states(OSHDBStore store) {
+ try {
+ var currentState = ReplicationEndpoint.stateFromInfo(store.state());
+ var serverState = currentState.serverState();
+ var startSequence = currentState.getSequenceNumber() + 1;
+ var endSequence = serverState.getSequenceNumber();
+ log.info("currentState: {}", currentState);
+ log.info("serverState: {}", serverState);
+ log.info("states to {} - {} ({})", startSequence, endSequence, endSequence - startSequence + 1);
+ var states = range(startSequence, endSequence - startSequence)
+ .concatMap(sequence -> fromCallable(() -> serverState.state(sequence)));
+ return concat(states, just(serverState));
+ } catch(Exception e) {
+ return Flux.error(e);
+ }
+ }
+
+// private Mono wait(ReplicationState state) {
+// var wait = Duration.between(Instant.now(), state.nextTimestamp());
+// log.info("wait {}m{}s {}", wait.toMinutesPart(), wait.toSecondsPart(), state);
+// return Flux.interval(wait, Duration.ofSeconds(2))
+// .concatMap(x -> fromCallable(state::serverState))
+// .doOnNext(newState -> log.info("check {}", state))
+// .filter(newState -> newState.getSequenceNumber() > state.getSequenceNumber())
+// .next();
+// }
+
+}
diff --git a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/KeyTables.java b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/KeyTables.java
new file mode 100644
index 000000000..f028434fb
--- /dev/null
+++ b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/KeyTables.java
@@ -0,0 +1,32 @@
+package org.heigit.ohsome.oshdb.util;
+
+import static java.lang.String.format;
+import static org.heigit.ohsome.oshdb.util.TableNames.E_KEY;
+import static org.heigit.ohsome.oshdb.util.TableNames.E_KEYVALUE;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class KeyTables {
+
+ private KeyTables(){
+ // utitily class
+ }
+
+ /**
+ * Initial Keytables tables.
+ * @param conn connection to keytables database
+ */
+ public static void init(Connection conn) throws SQLException {
+ try (var stmt = conn.createStatement()) {
+ stmt.execute("create table if not exists tag_key (id int primary key, txt varchar, values int)");
+ stmt.execute("create table if not exists tag_value (keyid int, valueid int, txt varchar, primary key (keyId,valueId))");
+ stmt.execute("create table if not exists role (id int primary key, txt varchar)");
+ stmt.execute("create table if not exists metadata (key varchar primary key, value varchar)");
+
+ // view for backward compatibility
+ stmt.execute(format("create or replace view %s as select id, txt, values from tag_key", E_KEY));
+ stmt.execute(format("create or replace view %s as select keyid, valueId, txt from tag_value", E_KEYVALUE));
+ }
+ }
+}
diff --git a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/CachedTagTranslator.java b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/CachedTagTranslator.java
index 75dbd7da5..84a29836e 100644
--- a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/CachedTagTranslator.java
+++ b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/CachedTagTranslator.java
@@ -17,6 +17,10 @@ public class CachedTagTranslator implements TagTranslator {
private final Cache lookupOSHDBTag;
private final Cache lookupOSHDBRole;
+ public CachedTagTranslator(TagTranslator source, long maxBytesValues) {
+ this(source, maxBytesValues, Integer.MAX_VALUE);
+ }
+
public CachedTagTranslator(TagTranslator source, long maxBytesValues, int maxNumRoles) {
this.source = source;
this.lookupOSHDBTag = Caffeine.newBuilder()
@@ -49,8 +53,8 @@ public Optional getOSHDBTagOf(OSMTag osm) {
}
@Override
- public Map getOSHDBTagOf(Collection tags) {
- var oshdb = source.getOSHDBTagOf(tags);
+ public Map getOSHDBTagOf(Collection values, TranslationOption option) {
+ var oshdb = source.getOSHDBTagOf(values, option);
oshdb.forEach((key, value) -> lookupOSHDBTag.put(value, key));
return oshdb;
}
@@ -60,14 +64,15 @@ public Optional getOSHDBRoleOf(OSMRole role) {
return source.getOSHDBRoleOf(role);
}
- @Override
- public Map getOSHDBRoleOf(Collection roles) {
- return source.getOSHDBRoleOf(roles);
+ @Override
+ public Map getOSHDBRoleOf(Collection values,
+ TranslationOption option) {
+ return source.getOSHDBRoleOf(values, option);
}
@Override
- public OSMTag lookupTag(OSHDBTag tag) {
- return lookupOSHDBTag.get(tag, source::lookupTag);
+ public Map lookupKey(Set extends OSHDBTagKey> keys) {
+ return source.lookupKey(keys);
}
@Override
diff --git a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/ClosableSqlArray.java b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/ClosableSqlArray.java
index 492d67301..31bcc9b73 100644
--- a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/ClosableSqlArray.java
+++ b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/ClosableSqlArray.java
@@ -12,7 +12,7 @@ public static ClosableSqlArray createArray(Connection conn, String typeName,
return new ClosableSqlArray(array);
}
- private Array array;
+ private final Array array;
public ClosableSqlArray(Array array) {
this.array = array;
@@ -23,7 +23,7 @@ public Array get() {
}
@Override
- public void close() throws Exception {
+ public void close() throws SQLException {
array.free();
}
}
diff --git a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/JdbcTagTranslator.java b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/JdbcTagTranslator.java
index 4da700e7f..803b23fdc 100644
--- a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/JdbcTagTranslator.java
+++ b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/tagtranslator/JdbcTagTranslator.java
@@ -1,71 +1,108 @@
package org.heigit.ohsome.oshdb.util.tagtranslator;
-import static java.util.stream.Collectors.toList;
+import static java.lang.String.format;
+import static java.util.function.Function.identity;
+import static java.util.function.Predicate.not;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.heigit.ohsome.oshdb.util.TableNames.E_ROLE;
import static org.heigit.ohsome.oshdb.util.tagtranslator.ClosableSqlArray.createArray;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Maps;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import javax.sql.DataSource;
import org.heigit.ohsome.oshdb.OSHDBRole;
import org.heigit.ohsome.oshdb.OSHDBTag;
import org.heigit.ohsome.oshdb.util.OSHDBTagKey;
-import org.heigit.ohsome.oshdb.util.TableNames;
import org.heigit.ohsome.oshdb.util.exceptions.OSHDBException;
@SuppressWarnings("java:S1192")
public class JdbcTagTranslator implements TagTranslator {
- private static final String OSM_OSHDB_KEY = String.format("SELECT id, txt"
- + " from %s k"
- + " where k.txt = ?", TableNames.E_KEY);
+ private static final String OSM_OSHDB_KEY = """
+ SELECT id, txt, values
+ FROM tag_key k
+ WHERE k.txt = any (?)""";
- private static final String OSM_OSHDB_TAG = String.format("SELECT keyid, valueid, kv.txt"
- + " from %s k"
- + " left join %s kv on k.id = kv.keyid"
- + " where k.txt = ? and kv.txt = any (?)", TableNames.E_KEY, TableNames.E_KEYVALUE);
+ private static final String MAX_KEY = "SELECT count(id) FROM tag_key";
- private static final String OSHDB_OSM_KEY = String.format("SELECT txt, id"
- + " from %s"
- + " where id = any(?)", TableNames.E_KEY);
+ private static final String OSM_OSHDB_TAG = """
+ SELECT keyid, valueid, kv.txt
+ FROM tag_key k
+ LEFT JOIN tag_value kv on k.id = kv.keyid
+ WHERE k.txt = ? AND kv.txt = any (?)""";
- private static final String OSHDB_OSM_TAG = String.format("SELECT txt, valueid"
- + " from %s"
- + " where keyid = ? and valueid = any (?)", TableNames.E_KEYVALUE);
+ private static final String OSHDB_OSM_KEY = """
+ SELECT txt, id
+ FROM tag_key
+ WHERE id = any(?)""";
- private static final String OSM_OSHDB_ROLE = String.format("SELECT id, txt"
- + " from %s"
- + " where txt = any (?)", TableNames.E_ROLE);
+ private static final String OSHDB_OSM_TAG = """
+ SELECT txt, valueid
+ FROM tag_value
+ WHERE keyid = ? and valueid = any (?)""";
+
+ private static final String ADD_OSHDB_KEY = "INSERT INTO tag_key values(?,?,?)";
+
+ private static final String UPDATE_OSHDB_KEY = "UPDATE tag_key SET values= ? WHERE id = ?";
+
+ private static final String ADD_OSHDB_TAG = "INSERT INTO tag_value values(?,?,?)";
+
+ private static final String OSM_OSHDB_ROLE = """
+ SELECT id, txt
+ FROM role
+ WHERE txt = any (?)""";
+
+ private static final String OSHDB_OSM_ROLE = """
+ SELECT txt, id
+ FROM role
+ WHERE id = any (?)""";
+
+ private static final String ADD_OSHDB_ROLE = "INSERT INTO role values(?,?) ";
- private static final String OSHDB_OSM_ROLE = String.format("SELECT txt, id"
- + " from %s"
- + " where id = any (?)", TableNames.E_ROLE);
private final DataSource source;
private final Cache cacheKeys;
+ private final boolean readonly;
+
+ /**
+ * Attention: This tag translator relies on a pooled datasource for thread-safety.
+ *
+ * @param source the (pooled) datasource
+ * @param readonly marks this TagTranslator to not adding tags to the database.
+ */
+ public JdbcTagTranslator(DataSource source, boolean readonly) {
+ this.source = source;
+ cacheKeys = Caffeine.newBuilder().build();
+ this.readonly = readonly;
+ }
/**
- * Attention:
- * This tag translator relies on a pooled datasource for thread-safety.
+ * Attention: This tag translator relies on a pooled datasource for thread-safety.
*
* @param source the (pooled) datasource
*/
public JdbcTagTranslator(DataSource source) {
- this.source = source;
- cacheKeys = Caffeine.newBuilder()
- .build();
+ this(source, true);
}
@Override
public Optional getOSHDBTagKeyOf(OSMTagKey key) {
try (var conn = source.getConnection();
- var pstmt = conn.prepareStatement(OSM_OSHDB_KEY)) {
- pstmt.setString(1, key.toString());
+ var sqlArray = createArray(conn, "text", Set.of(key.toString()));
+ var pstmt = conn.prepareStatement(OSM_OSHDB_KEY)) {
+ pstmt.setArray(1, sqlArray.get());
try (var rst = pstmt.executeQuery()) {
if (rst.next()) {
return Optional.of(new OSHDBTagKey(rst.getInt(1)));
@@ -78,28 +115,83 @@ public Optional getOSHDBTagKeyOf(OSMTagKey key) {
}
@Override
- public Optional getOSHDBTagOf(OSMTag tag) {
- return Optional.ofNullable(loadTags(tag.getKey(), Map.of(tag.getValue(), tag)).get(tag));
- }
+ public Map getOSHDBTagOf(Collection tags, TranslationOption option) {
+ if (option != TranslationOption.READONLY && !readonly) {
+ return getOrAddOSHDBTagsOf(tags);
+ }
- @Override
- public Map getOSHDBTagOf(Collection tags) {
var keyTags = Maps.>newHashMapWithExpectedSize(tags.size());
tags.forEach(tag -> keyTags.computeIfAbsent(tag.getKey(), x -> new HashMap<>())
.put(tag.getValue(), tag));
var result = Maps.newConcurrentMap();
keyTags.entrySet().parallelStream()
- .map(entry -> loadTags(entry.getKey(), entry.getValue()))
- .forEach(result::putAll);
+ .map(entry -> loadTags(entry.getKey(), entry.getValue()))
+ .forEach(result::putAll);
return result;
}
+ private synchronized Map getOrAddOSHDBTagsOf(Collection osmTags) {
+ try {
+ var keyTags = osmTags.stream()
+ .collect(groupingBy(OSMTag::getKey, toMap(OSMTag::getValue, identity())));
+ var keys = loadKeys(keyTags);
+
+ var existing = keyTags.entrySet().parallelStream()
+ .filter(entry -> keys.get(entry.getKey()).values() > 0)
+ .flatMap(entry -> loadTags(entry.getKey(), entry.getValue()).entrySet().stream())
+ .collect(toMap(Entry::getKey, Entry::getValue));
+
+ var missingKeyTags = osmTags.stream().filter(not(existing::containsKey))
+ .collect(groupingBy(OSMTag::getKey, toMap(OSMTag::getValue, identity())));
+ var newTags = missingKeyTags.entrySet().parallelStream()
+ .flatMap(entry -> addTags(keys.get(entry.getKey()), entry.getValue()).entrySet().stream())
+ .collect(toMap(Entry::getKey, Entry::getValue));
+ existing.putAll(newTags);
+ return existing;
+ } catch (SQLException e) {
+ throw new OSHDBException(e);
+ }
+ }
+
+ private Map loadKeys(Map> keyTags)
+ throws SQLException {
+ try (var conn = source.getConnection()) {
+ var keys = Maps.newHashMapWithExpectedSize(keyTags.size());
+ try (var sqlArray = createArray(conn, "text", keyTags.keySet());
+ var pstmt = conn.prepareStatement(OSM_OSHDB_KEY)) {
+ pstmt.setArray(1, sqlArray.get());
+ try (var rst = pstmt.executeQuery()) {
+ while (rst.next()) {
+ var keyValues = new KeyValues(rst.getInt(1), rst.getString(2), rst.getInt(3));
+ keys.put(keyValues.txt(), keyValues);
+ }
+ }
+ }
+ var newKeys = keyTags.keySet().stream().filter(not(keys::containsKey)).collect(toSet());
+ if (!newKeys.isEmpty()) {
+ var nextKeyId = nextKeyId(conn);
+ for (var newKey : newKeys) {
+ keys.put(newKey, new KeyValues(nextKeyId++, newKey, 0));
+ }
+ }
+ return keys;
+ }
+ }
+ private int nextKeyId(Connection conn) throws SQLException {
+ try (var stmt = conn.createStatement();
+ var rst = stmt.executeQuery(MAX_KEY)) {
+ if (!rst.next()) {
+ throw new NoSuchElementException();
+ }
+ return rst.getInt(1);
+ }
+ }
private Map loadTags(String key, Map values) {
try (var conn = source.getConnection();
- var sqlArray = createArray(conn, "text", values.keySet());
- var pstmt = conn.prepareStatement(OSM_OSHDB_TAG)) {
+ var sqlArray = createArray(conn, "text", values.keySet());
+ var pstmt = conn.prepareStatement(OSM_OSHDB_TAG)) {
pstmt.setString(1, key);
pstmt.setArray(2, sqlArray.get());
try (var rst = pstmt.executeQuery()) {
@@ -117,20 +209,91 @@ private Map loadTags(String key, Map values) {
}
}
- @Override
- public Optional getOSHDBRoleOf(OSMRole role) {
- return Optional.ofNullable(loadRoles(Set.of(role)).get(role));
+ private Map addTags(KeyValues keyValues, Map tags) {
+ var map = Maps.newHashMapWithExpectedSize(tags.size());
+ try (var conn = source.getConnection();
+ var addKey = keyValues.values() == 0 ? conn.prepareStatement(ADD_OSHDB_KEY)
+ : conn.prepareStatement(UPDATE_OSHDB_KEY);
+ var addTag = conn.prepareStatement(ADD_OSHDB_TAG)) {
+
+ var keyId = keyValues.id();
+ var keyTxt = keyValues.txt();
+ var nextValueId = keyValues.values();
+
+ var batchSize = 0;
+ for (var entry : tags.entrySet()) {
+ var txt = entry.getKey();
+ var osm = entry.getValue();
+ var oshdb = new OSHDBTag(keyId, nextValueId++);
+ addTag.setInt(1, oshdb.getKey());
+ addTag.setInt(2, oshdb.getValue());
+ addTag.setString(3, txt);
+ addTag.addBatch();
+ batchSize++;
+ if (batchSize >= 1000) {
+ addTag.executeBatch();
+ batchSize = 0;
+ }
+ map.put(osm, oshdb);
+ }
+ addTag.executeBatch();
+ if (keyValues.values() == 0) {
+ addKey.setInt(1, keyId);
+ addKey.setString(2, keyTxt);
+ addKey.setInt(3, nextValueId);
+ } else {
+ addKey.setInt(1, nextValueId);
+ addKey.setInt(2, keyId);
+ }
+ addKey.executeUpdate();
+ return map;
+ } catch (SQLException e) {
+ throw new OSHDBException(e);
+ }
}
@Override
- public Map getOSHDBRoleOf(Collection roles) {
+ public Map getOSHDBRoleOf(Collection roles,
+ TranslationOption option) {
+ if (option != TranslationOption.READONLY && !readonly) {
+ return getOrAddOSHDBRoleOf(roles);
+ }
return loadRoles(roles);
}
+ private synchronized Map getOrAddOSHDBRoleOf(Collection roles) {
+ var existing = loadRoles(roles);
+ var missing = roles.stream().filter(not(existing::containsKey)).collect(toSet());
+ try (var conn = source.getConnection();
+ var pstmt = conn.prepareStatement(ADD_OSHDB_ROLE)) {
+ var nextRoleId = nextRoleId(conn);
+ for (var osm : missing) {
+ var oshdb = OSHDBRole.of(nextRoleId++);
+ pstmt.setInt(1, oshdb.getId());
+ pstmt.setString(2, osm.toString());
+ pstmt.addBatch();
+ existing.put(osm, oshdb);
+ }
+ pstmt.executeBatch();
+ } catch (SQLException e) {
+ throw new OSHDBException(e);
+ }
+ return existing;
+ }
+
+ private int nextRoleId(Connection conn) throws SQLException {
+ try (var stmt = conn.createStatement();
+ var rst = stmt.executeQuery(format("select count(*) from %s", E_ROLE))) {
+ if (!rst.next()) {
+ throw new NoSuchElementException();
+ }
+ return rst.getInt(1);
+ }
+ }
+
private Map loadRoles(Collection roles) {
try (var conn = source.getConnection();
- var sqlArray =
- createArray(conn, "text", roles.stream().map(OSMRole::toString).collect(toList()));
+ var sqlArray = createArray(conn, "text", roles.stream().map(OSMRole::toString).toList());
var pstmt = conn.prepareStatement(OSM_OSHDB_ROLE)) {
pstmt.setArray(1, sqlArray.get());
try (var rst = pstmt.executeQuery()) {
@@ -147,14 +310,21 @@ private Map loadRoles(Collection roles) {
}
}
+ @Override
+ public Map lookupKey(Set extends OSHDBTagKey> oshdbTagKeys) {
+ var keys = oshdbTagKeys.stream().map(OSHDBTagKey::toInt).collect(toSet());
+ return cacheKeys.getAll(keys, this::lookupKeys).entrySet()
+ .stream()
+ .collect(toMap(entry -> new OSHDBTagKey(entry.getKey()),
+ entry -> new OSMTagKey(entry.getValue())));
+ }
+
@Override
public OSMTag lookupTag(OSHDBTag tag) {
var keyTxt = cacheKeys.getAll(Set.of(tag.getKey()), this::lookupKeys).get(tag.getKey());
return lookupTags(tag.getKey(), keyTxt, Map.of(tag.getValue(), tag)).get(tag);
}
-
-
@Override
public Map lookupTag(Set extends OSHDBTag> tags) {
var keyTags = Maps.>newHashMapWithExpectedSize(tags.size());
@@ -163,15 +333,15 @@ public Map lookupTag(Set extends OSHDBTag> tags) {
var keys = cacheKeys.getAll(keyTags.keySet(), this::lookupKeys);
var result = Maps.newConcurrentMap();
keyTags.entrySet().parallelStream()
- .map(entry -> lookupTags(entry.getKey(), keys.get(entry.getKey()), entry.getValue()))
- .forEach(result::putAll);
+ .map(entry -> lookupTags(entry.getKey(), keys.get(entry.getKey()), entry.getValue()))
+ .forEach(result::putAll);
return result;
}
private Map lookupKeys(Set extends Integer> osm) {
try (var conn = source.getConnection();
- var sqlArray = createArray(conn, "int", osm);
- var pstmt = conn.prepareStatement(OSHDB_OSM_KEY)) {
+ var sqlArray = createArray(conn, "int", osm);
+ var pstmt = conn.prepareStatement(OSHDB_OSM_KEY)) {
pstmt.setArray(1, sqlArray.get());
try (var rst = pstmt.executeQuery()) {
var map = Maps.newHashMapWithExpectedSize(osm.size());
@@ -187,8 +357,8 @@ private Map