diff --git a/table/src/main/java/tech/ydb/table/impl/ImplicitSession.java b/table/src/main/java/tech/ydb/table/impl/ImplicitSession.java new file mode 100644 index 000000000..986e1c2d9 --- /dev/null +++ b/table/src/main/java/tech/ydb/table/impl/ImplicitSession.java @@ -0,0 +1,325 @@ +package tech.ydb.table.impl; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.impl.call.ProxyReadStream; +import tech.ydb.core.operation.Operation; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.ValueProtos; +import tech.ydb.proto.table.YdbTable; +import tech.ydb.table.Session; +import tech.ydb.table.SessionSupplier; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.query.DataQuery; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.ExplainDataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.query.ReadRowsResult; +import tech.ydb.table.query.ReadTablePart; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; +import tech.ydb.table.rpc.TableRpc; +import tech.ydb.table.rpc.grpc.GrpcTableRpc; +import tech.ydb.table.settings.AlterTableSettings; +import tech.ydb.table.settings.BeginTxSettings; +import tech.ydb.table.settings.BulkUpsertSettings; +import tech.ydb.table.settings.CommitTxSettings; +import tech.ydb.table.settings.CopyTableSettings; +import tech.ydb.table.settings.CopyTablesSettings; +import tech.ydb.table.settings.CreateTableSettings; +import tech.ydb.table.settings.DescribeTableSettings; +import tech.ydb.table.settings.DropTableSettings; +import tech.ydb.table.settings.ExecuteDataQuerySettings; +import tech.ydb.table.settings.ExecuteScanQuerySettings; +import tech.ydb.table.settings.ExecuteSchemeQuerySettings; +import tech.ydb.table.settings.ExplainDataQuerySettings; +import tech.ydb.table.settings.KeepAliveSessionSettings; +import tech.ydb.table.settings.PrepareDataQuerySettings; +import tech.ydb.table.settings.ReadRowsSettings; +import tech.ydb.table.settings.ReadTableSettings; +import tech.ydb.table.settings.RenameTablesSettings; +import tech.ydb.table.settings.RollbackTxSettings; +import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.table.transaction.Transaction; +import tech.ydb.table.transaction.TxControl; +import tech.ydb.table.values.ListType; +import tech.ydb.table.values.ListValue; +import tech.ydb.table.values.StructValue; +import tech.ydb.table.values.Value; +import tech.ydb.table.values.proto.ProtoValue; + +/** + * + * @author Aleksandr Gorshenin + */ +public class ImplicitSession implements Session, SessionSupplier { + private final TableRpc rpc; + + private ImplicitSession(GrpcTransport transport) { + this.rpc = GrpcTableRpc.useTransport(transport); + } + + @Override + public CompletableFuture> createSession(Duration duration) { + return CompletableFuture.completedFuture(Result.success(this)); + } + + @Override + public ScheduledExecutorService getScheduler() { + return rpc.getScheduler(); + } + + private GrpcRequestSettings makeGrpcRequestSettings(Duration timeout, String traceId) { + return GrpcRequestSettings.newBuilder() + .withDeadline(timeout) + .withTraceId(traceId == null ? UUID.randomUUID().toString() : traceId) + .build(); + } + + @Override + public CompletableFuture> readRows(String pathToTable, ReadRowsSettings settings) { + ValueProtos.TypedValue keys = ValueProtos.TypedValue.newBuilder().build(); + if (!settings.getKeys().isEmpty()) { + ValueProtos.Type type = ListType.of(settings.getKeys().get(0).getType()).toPb(); + List values = settings.getKeys().stream() + .map(StructValue::toPb) + .collect(Collectors.toList()); + keys = ValueProtos.TypedValue.newBuilder() + .setType(type) + .setValue(ValueProtos.Value.newBuilder().addAllItems(values)) + .build(); + } + + YdbTable.ReadRowsRequest request = YdbTable.ReadRowsRequest.newBuilder() + .setPath(pathToTable) + .addAllColumns(settings.getColumns()) + .setKeys(keys) + .build(); + + GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings.getRequestTimeout(), settings.getTraceId()); + return rpc.readRows(request, grpcSettings).thenApply(result -> result.map(ReadRowsResult::new)); + } + + @Override + public GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings) { + YdbTable.ReadTableRequest.Builder request = YdbTable.ReadTableRequest.newBuilder() + .setPath(tablePath) + .setOrdered(settings.isOrdered()) + .setRowLimit(settings.getRowLimit()) + .setBatchLimitBytes(settings.batchLimitBytes()) + .setBatchLimitRows(settings.batchLimitRows()); + + Value fromKey = settings.getFromKey(); + if (fromKey != null) { + YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder(); + if (settings.isFromInclusive()) { + range.setGreaterOrEqual(ProtoValue.toTypedValue(fromKey)); + } else { + range.setGreater(ProtoValue.toTypedValue(fromKey)); + } + } + + Value toKey = settings.getToKey(); + if (toKey != null) { + YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder(); + if (settings.isToInclusive()) { + range.setLessOrEqual(ProtoValue.toTypedValue(toKey)); + } else { + range.setLess(ProtoValue.toTypedValue(toKey)); + } + } + + if (!settings.getColumns().isEmpty()) { + request.addAllColumns(settings.getColumns()); + } + + GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings.getRequestTimeout(), settings.getTraceId()); + GrpcReadStream origin = rpc.streamReadTable(request.build(), grpcSettings); + + return new ProxyReadStream<>(origin, (response, future, observer) -> { + StatusCodesProtos.StatusIds.StatusCode statusCode = response.getStatus(); + if (statusCode == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { + try { + observer.onNext(new ReadTablePart(response.getResult(), response.getSnapshot())); + } catch (Throwable t) { + future.completeExceptionally(t); + origin.cancel(); + } + } else { + Issue[] issues = Issue.fromPb(response.getIssuesList()); + StatusCode code = StatusCode.fromProto(statusCode); + future.complete(Status.of(code, issues)); + origin.cancel(); + } + }); + } + + @Override + public GrpcReadStream executeScanQuery( + String query, Params params, ExecuteScanQuerySettings settings + ) { + YdbTable.ExecuteScanQueryRequest request = YdbTable.ExecuteScanQueryRequest.newBuilder() + .setQuery(YdbTable.Query.newBuilder().setYqlText(query)) + .setMode(settings.getMode().toPb()) + .putAllParameters(params.toPb()) + .setCollectStats(settings.getCollectStats().toPb()) + .build(); + + GrpcReadStream origin = rpc.streamExecuteScanQuery( + request, makeGrpcRequestSettings(settings.getRequestTimeout(), settings.getTraceId()) + ); + + return new ProxyReadStream<>(origin, (response, future, observer) -> { + StatusCodesProtos.StatusIds.StatusCode statusCode = response.getStatus(); + if (statusCode == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { + try { + observer.onNext(ProtoValueReaders.forResultSet(response.getResult().getResultSet())); + } catch (Throwable t) { + future.completeExceptionally(t); + origin.cancel(); + } + } else { + Issue[] issues = Issue.fromPb(response.getIssuesList()); + StatusCode code = StatusCode.fromProto(statusCode); + future.complete(Status.of(code, issues)); + origin.cancel(); + } + }); + } + + @Override + public CompletableFuture executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) { + ValueProtos.TypedValue typedRows = ValueProtos.TypedValue.newBuilder() + .setType(rows.getType().toPb()) + .setValue(rows.toPb()) + .build(); + + YdbTable.BulkUpsertRequest request = YdbTable.BulkUpsertRequest.newBuilder() + .setTable(tablePath) + .setRows(typedRows) + .setOperationParams(Operation.buildParams(settings.toOperationSettings())) + .build(); + + return rpc.bulkUpsert(request, makeGrpcRequestSettings(settings.getTimeoutDuration(), settings.getTraceId())); + } + + public static ImplicitSession of(GrpcTransport transport) { + return new ImplicitSession(transport); + } + + @Override + public String getId() { + return "Implicit YDB session"; + } + + @Override + public void close() { + // NOTHING + } + + @Override + public CompletableFuture createTable( + String path, TableDescription tableDescriptions, CreateTableSettings settings + ) { + throw new UnsupportedOperationException("Implicit session doesn't support createTable"); + } + + @Override + public CompletableFuture dropTable(String path, DropTableSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support dropTable"); + } + + @Override + public CompletableFuture alterTable(String path, AlterTableSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support alterTable"); + } + + @Override + public CompletableFuture copyTable(String src, String dst, CopyTableSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support copyTable"); + } + + @Override + public CompletableFuture copyTables(CopyTablesSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support copyTables"); + } + + @Override + public CompletableFuture renameTables(RenameTablesSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support renameTables"); + } + + @Override + public CompletableFuture> describeTable(String path, DescribeTableSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support describeTable"); + } + + @Override + public CompletableFuture> prepareDataQuery(String query, PrepareDataQuerySettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support prepareDataQuery"); + } + + @Override + public CompletableFuture> executeDataQuery( + String query, TxControl txControl, Params params, ExecuteDataQuerySettings settings + ) { + throw new UnsupportedOperationException("Implicit session doesn't support executeDataQuery"); + } + + @Override + public CompletableFuture executeSchemeQuery(String query, ExecuteSchemeQuerySettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support executeSchemeQuery"); + } + + @Override + public CompletableFuture> explainDataQuery( + String query, ExplainDataQuerySettings settings + ) { + throw new UnsupportedOperationException("Implicit session doesn't support explainDataQuery"); + } + + @Override + public CompletableFuture> beginTransaction( + Transaction.Mode transactionMode, BeginTxSettings settings + ) { + throw new UnsupportedOperationException("Implicit session doesn't support beginTransaction"); + } + + @Override + public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support beginTransaction"); + } + + @Override + public TableTransaction createNewTransaction(TxMode txMode) { + throw new UnsupportedOperationException("Implicit session doesn't support createNewTransaction"); + } + + @Override + public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support commitTransaction"); + } + + @Override + public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support rollbackTransaction"); + } + + @Override + public CompletableFuture> keepAlive(KeepAliveSessionSettings settings) { + throw new UnsupportedOperationException("Implicit session doesn't support keepAlive"); + } +} diff --git a/table/src/test/java/tech/ydb/table/integration/ImplicitSessionTest.java b/table/src/test/java/tech/ydb/table/integration/ImplicitSessionTest.java new file mode 100644 index 000000000..83d7c4371 --- /dev/null +++ b/table/src/test/java/tech/ydb/table/integration/ImplicitSessionTest.java @@ -0,0 +1,217 @@ +package tech.ydb.table.integration; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.hash.Hashing; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.table.SessionRetryContext; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.impl.ImplicitSession; +import tech.ydb.table.impl.SimpleTableClient; +import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.rpc.grpc.GrpcTableRpc; +import tech.ydb.table.settings.ExecuteScanQuerySettings; +import tech.ydb.table.settings.ReadRowsSettings; +import tech.ydb.table.settings.ReadTableSettings; +import tech.ydb.table.values.ListType; +import tech.ydb.table.values.ListValue; +import tech.ydb.table.values.PrimitiveType; +import tech.ydb.table.values.PrimitiveValue; +import tech.ydb.table.values.StructType; +import tech.ydb.table.values.StructValue; +import tech.ydb.table.values.TupleValue; +import tech.ydb.table.values.VoidValue; +import tech.ydb.test.junit4.GrpcTransportRule; + +/** + * + * @author Aleksandr Gorshenin + */ +public class ImplicitSessionTest { + @ClassRule + public final static GrpcTransportRule YDB = new GrpcTransportRule(); + + private final static String TABLE_NAME = "implicit_session_test"; + private final static Instant INSTANT = Instant.ofEpochMilli(1585932011123l); // Friday, April 3, 2020 4:40:11.123 PM + private final static StructType TYPE = StructType.of( + "id1", PrimitiveType.Text, + "id2", PrimitiveType.Uint64, + "payload", PrimitiveType.Bytes, + "created", PrimitiveType.Timestamp + ); + + private final SimpleTableClient tableClient = SimpleTableClient.newClient(GrpcTableRpc.useTransport(YDB)).build(); + + private final SessionRetryContext implicitCtx = SessionRetryContext.create(ImplicitSession.of(YDB)).build(); + + @Before + public void createTable() { + String tablePath = YDB.getDatabase() + "/" + TABLE_NAME; + + TableDescription tableDescription = TableDescription.newBuilder() + .addNonnullColumn("id1", PrimitiveType.Text) + .addNonnullColumn("id2", PrimitiveType.Uint64) + .addNullableColumn("payload", PrimitiveType.Bytes) + .addNullableColumn("created", PrimitiveType.Timestamp) + .setPrimaryKeys("id1", "id2") + .build(); + + SessionRetryContext.create(tableClient).build() + .supplyStatus(session -> session.createTable(tablePath, tableDescription)) + .join().expectSuccess("create table error"); + } + + @After + public void dropTable() { + String tablePath = YDB.getDatabase() + "/" + TABLE_NAME; + SessionRetryContext.create(tableClient).build() + .supplyStatus(session -> session.dropTable(tablePath)).join().expectSuccess("drop table error"); + } + + private String hash(String text) { + return Hashing.sha256().hashBytes(text.getBytes()).toString(); + } + + private byte[] createPayload(int idx, int type) { + if (type == 0) { + return null; + } + + int v = (31 * idx) % type; + byte[] bytes = new byte[v]; + for (int i = 0; i < bytes.length; i += 1) { + v = 31 * v + 7; + bytes[i] = (byte) v; + } + return bytes; + } + + private ListValue generateBulk(int startInclusive, int endExclusive) { + return ListType.of(TYPE).newValue(IntStream.range(startInclusive, endExclusive).mapToObj(idx -> { + byte[] payload = createPayload(idx, idx % 7); + return TYPE.newValue( + "id1", PrimitiveValue.newText(hash("hashed_id_" + idx)), + "id2", PrimitiveValue.newUint64(idx), + "payload", payload == null ? VoidValue.of() : PrimitiveValue.newBytes(payload), + "created", PrimitiveValue.newTimestamp(INSTANT.plusSeconds(idx)) + ); + }).collect(Collectors.toList())); + } + + private StructValue keyStruct(int idx) { + return StructValue.of( + "id2", PrimitiveValue.newUint64(idx), + "id1", PrimitiveValue.newText(hash("hashed_id_" + idx)) + ); + } + + private TupleValue keyTuple(int idx) { + return TupleValue.of( + PrimitiveValue.newText(hash("hashed_id_" + idx)).makeOptional(), + PrimitiveValue.newUint64(idx).makeOptional() + ); + } + + @Test + public void implicitSessionTest() { + String tablePath = YDB.getDatabase() + "/" + TABLE_NAME; + + // Create type for struct of series + // Create and fill list of series + ListValue bulk1 = generateBulk(0, 1000); + ListValue bulk2 = generateBulk(1000, 2000); + ListValue bulk3 = generateBulk(2000, 3000); + + // Bulk Upsert + implicitCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, bulk1)) + .join().expectSuccess("Cannot execute bulk upsert"); + implicitCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, bulk2)) + .join().expectSuccess("Cannot execute bulk upsert"); + implicitCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, bulk3)) + .join().expectSuccess("Cannot execute bulk upsert"); + + // Read Rows + ResultSetReader readedRows = implicitCtx.supplyResult( + session -> session.readRows(tablePath, ReadRowsSettings.newBuilder() + .addKeys(keyStruct(546), keyStruct(1123), keyStruct(2299)) + .addColumns("id2", "payload") + .build()) + ).join().getValue().getResultSetReader(); + + Assert.assertTrue(readedRows.next()); + Assert.assertEquals(546, readedRows.getColumn("id2").getUint64()); + // TODO: check readRows method with nullable results + // Assert.assertFalse(readedRows.getColumn("payload").isOptionalItemPresent()); + + Assert.assertTrue(readedRows.next()); + Assert.assertEquals(1123, readedRows.getColumn("id2").getUint64()); + Assert.assertArrayEquals(createPayload(1123, 1123 % 7), readedRows.getColumn("payload").getBytes()); + + Assert.assertTrue(readedRows.next()); + Assert.assertEquals(2299, readedRows.getColumn("id2").getUint64()); + Assert.assertArrayEquals(createPayload(2299, 2299 % 7), readedRows.getColumn("payload").getBytes()); + + // Read Table + final List readedTable = new ArrayList<>(); + implicitCtx.supplyStatus(session -> { + readedTable.clear(); + ReadTableSettings settings = ReadTableSettings.newBuilder() + .batchLimitRows(100) + .fromKeyInclusive(keyTuple(234)) + .toKeyExclusive(keyTuple(2300)) + .orderedRead(true) + .columns("id1", "id2", "created") + .build(); + return session.executeReadTable(tablePath, settings).start(part -> { + ResultSetReader rs = part.getResultSetReader(); + while (rs.next()) { + long idx = rs.getColumn("id2").getUint64(); + readedTable.add(idx); + Assert.assertEquals(INSTANT.plusSeconds(idx), rs.getColumn("created").getTimestamp()); + } + }); + }).join().expectSuccess("cannot read table"); + + Assert.assertFalse(readedTable.isEmpty()); + Assert.assertEquals(Long.valueOf(234l), readedTable.get(0)); + + // Scan Query + final List scanedTable = new ArrayList<>(); + implicitCtx.supplyStatus(session -> { + scanedTable.clear(); + return session.executeScanQuery( + "DECLARE $p1 AS UInt64;\n" + + "DECLARE $p2 AS UInt64;\n" + + "SELECT id1, id2, payload FROM " + TABLE_NAME + " WHERE id2 >= $p1 AND id2 < $p2 ORDER BY id2", + Params.of("$p1", PrimitiveValue.newUint64(234), "$p2", PrimitiveValue.newUint64(2300)), + ExecuteScanQuerySettings.newBuilder().build() + ).start(rs -> { + while (rs.next()) { + int idx = (int) rs.getColumn("id2").getUint64(); + scanedTable.add(idx); + + byte[] expected = createPayload(idx, idx % 7); + if (expected != null) { + Assert.assertArrayEquals(expected, rs.getColumn("payload").getBytes()); + } else { + Assert.assertFalse(rs.getColumn("payload").isOptionalItemPresent()); + } + } + }); + }).join().expectSuccess("cannot scan table"); + + Assert.assertEquals(2066, scanedTable.size()); + Assert.assertEquals(Integer.valueOf(234), scanedTable.get(0)); + Assert.assertEquals(Integer.valueOf(2299), scanedTable.get(2065)); + } +}