diff --git a/castor-common/src/main/java/io/carbynestack/castor/common/entities/ArrayBackedTuple.java b/castor-common/src/main/java/io/carbynestack/castor/common/entities/ArrayBackedTuple.java index 587b37a..221bc0c 100644 --- a/castor-common/src/main/java/io/carbynestack/castor/common/entities/ArrayBackedTuple.java +++ b/castor-common/src/main/java/io/carbynestack/castor/common/entities/ArrayBackedTuple.java @@ -8,6 +8,8 @@ package io.carbynestack.castor.common.entities; import io.carbynestack.castor.common.exceptions.CastorClientException; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; diff --git a/castor-java-client/3RD-PARTY-LICENSES/sbom.xml b/castor-java-client/3RD-PARTY-LICENSES/sbom.xml index 591c68a..4969ec8 100644 --- a/castor-java-client/3RD-PARTY-LICENSES/sbom.xml +++ b/castor-java-client/3RD-PARTY-LICENSES/sbom.xml @@ -108,7 +108,7 @@ Carbyne Stack Java HTTP Client io.carbynestack java-http-client - 0.1-SNAPSHOT-4075044562-9-8ffab9c + 0.1.2 Apache-2.0 @@ -132,7 +132,7 @@ castor-common io.carbynestack castor-common - 0.1-SNAPSHOT-4321261594-23-40a9faa + 0.1.1 Apache-2.0 diff --git a/castor-java-client/pom.xml b/castor-java-client/pom.xml index b9f647c..e9d2bc4 100644 --- a/castor-java-client/pom.xml +++ b/castor-java-client/pom.xml @@ -41,6 +41,7 @@ io.carbynestack java-http-client + 0.1.2 diff --git a/castor-java-client/src/main/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClient.java b/castor-java-client/src/main/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClient.java index b15da66..6a68ba9 100644 --- a/castor-java-client/src/main/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClient.java +++ b/castor-java-client/src/main/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClient.java @@ -20,6 +20,10 @@ import io.carbynestack.httpclient.CsHttpClientException; import io.vavr.control.Option; import io.vavr.control.Try; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.UUID; import lombok.Value; @@ -91,12 +95,17 @@ private DefaultCastorIntraVcpClient(Builder builder) { @Override public TupleList downloadTupleShares(UUID requestId, TupleType tupleType, long count) { try { - return csHttpClient - .getForEntity( - serviceUri.getIntraVcpRequestTuplesUri(requestId, tupleType, count), - getHeaders(serviceUri), - TupleList.class) - .get(); + byte[] shares = csHttpClient + .getForEntity( + serviceUri.getIntraVcpRequestTuplesUri(requestId, tupleType, count), + getHeaders(serviceUri), + byte[].class) + .get(); + long length = tupleType.getTupleSize() * count; + return TupleList.fromStream(tupleType.getTupleCls(), + tupleType.getField(), + new ByteArrayInputStream(shares), + length); } catch (CsHttpClientException chce) { throw new CastorClientException( String.format( @@ -104,6 +113,8 @@ public TupleList downloadTupleShares(UUID requestId, TupleType tupleType, long c serviceUri.getRestServiceUri(), chce.getMessage()), chce); + } catch (IOException e) { + throw new RuntimeException(e); } } diff --git a/castor-java-client/src/test/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClientTest.java b/castor-java-client/src/test/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClientTest.java index 9c36674..bc2bff2 100644 --- a/castor-java-client/src/test/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClientTest.java +++ b/castor-java-client/src/test/java/io/carbynestack/castor/client/download/DefaultCastorIntraVcpClientTest.java @@ -15,8 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import io.carbynestack.castor.common.BearerTokenProvider; import io.carbynestack.castor.common.CastorServiceUri; @@ -25,7 +24,9 @@ import io.carbynestack.httpclient.CsHttpClient; import io.carbynestack.httpclient.CsHttpClientException; import io.carbynestack.httpclient.CsResponseEntity; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.util.ArrayList; @@ -99,23 +100,35 @@ void givenSslConfiguration_whenBuildClient_thenInitializeCsHttpClientAccordingly @SneakyThrows @Test - void givenSuccessfulRequest_whenDownloadTripleShares_thenReturnExpectedContent() { + void givenSuccessfulRequest_whenDownloadTripleSharesAsBytes_thenReturnExpectedContent() { UUID requestId = UUID.fromString("3dc08ff2-5eed-49a9-979e-3a3ac0e4a2cf"); int expectedCount = 2; TupleList, Field.Gfp> expectedTripleList = new TupleList(MultiplicationTriple.class, GFP); + expectedTripleList.add(new MultiplicationTriple(GFP, testShare, testShare, testShare)); expectedTripleList.add(new MultiplicationTriple(GFP, testShare, testShare, testShare)); - CsResponseEntity givenResponseEntity = - CsResponseEntity.success(HttpStatus.SC_OK, expectedTripleList); + ByteArrayOutputStream tripeListAsBytes = new ByteArrayOutputStream(); + expectedTripleList.forEach( + tuple -> { + try { + tuple.writeTo(tripeListAsBytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + CsResponseEntity givenResponseEntity = + CsResponseEntity.success(HttpStatus.SC_OK, tripeListAsBytes.toByteArray()); + CastorServiceUri serviceUri = new CastorServiceUri(serviceAddress); - when(csHttpClientMock.getForEntity( + doReturn(givenResponseEntity) + .when(csHttpClientMock) + .getForEntity( serviceUri.getIntraVcpRequestTuplesUri( requestId, TupleType.MULTIPLICATION_TRIPLE_GFP, expectedCount), Collections.emptyList(), - TupleList.class)) - .thenReturn(givenResponseEntity); + byte[].class); TupleList actualTripleList = castorIntraVcpClient.downloadTupleShares( requestId, TupleType.MULTIPLICATION_TRIPLE_GFP, expectedCount); @@ -129,7 +142,7 @@ void givenRequestEmitsError_whenDownloadTripleShares_thenThrowCastorClientExcept UUID requestId = UUID.fromString("3dc08ff2-5eed-49a9-979e-3a3ac0e4a2cf"); CsHttpClientException expectedCause = new CsHttpClientException("totally expected"); URI expectedUri = new CastorServiceUri(serviceAddress).getRestServiceUri(); - when(csHttpClientMock.getForEntity(any(), eq(Collections.emptyList()), eq(TupleList.class))) + when(csHttpClientMock.getForEntity(any(), eq(Collections.emptyList()), eq(byte[].class))) .thenThrow(expectedCause); CastorClientException actualCce = assertThrows( diff --git a/castor-service/3RD-PARTY-LICENSES/sbom.xml b/castor-service/3RD-PARTY-LICENSES/sbom.xml index 9b0f6ce..bdf398c 100644 --- a/castor-service/3RD-PARTY-LICENSES/sbom.xml +++ b/castor-service/3RD-PARTY-LICENSES/sbom.xml @@ -1,5 +1,18 @@ + + Animal Sniffer Annotations + org.codehaus.mojo + animal-sniffer-annotations + 1.18 + http://www.mojohaus.org/animal-sniffer/animal-sniffer-annotations + + + MIT license + http://www.opensource.org/licenses/mit-license.php + + + AntLR Parser Generator antlr @@ -262,7 +275,7 @@ castor-common io.carbynestack castor-common - 0.1-SNAPSHOT-4321261594-23-40a9faa + 0.1.1 Apache-2.0 @@ -274,7 +287,7 @@ castor-java-client io.carbynestack castor-java-client - 0.1-SNAPSHOT-4321261594-23-40a9faa + 0.1.1 Apache-2.0 @@ -325,8 +338,8 @@ error-prone annotations com.google.errorprone error_prone_annotations - 2.3.4 - http://nexus.sonatype.org/oss-repository-hosting.html/error_prone_parent/error_prone_annotations + 2.10.0 + https://errorprone.info/error_prone_annotations Apache 2.0 @@ -347,6 +360,32 @@ + + Google Android Annotations Library + com.google.android + annotations + 4.1.1.4 + http://source.android.com/ + + + Apache 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + + + + + Gson + com.google.code.gson + gson + 2.8.9 + https://github.com/google/gson/gson + + + Apache-2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + + + Guava InternalFutureFailureAccess and InternalFutures com.google.guava @@ -468,6 +507,97 @@ + + io.grpc:grpc-api + io.grpc + grpc-api + 1.34.1 + https://github.com/grpc/grpc-java + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + + + io.grpc:grpc-context + io.grpc + grpc-context + 1.34.1 + https://github.com/grpc/grpc-java + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + + + io.grpc:grpc-core + io.grpc + grpc-core + 1.34.1 + https://github.com/grpc/grpc-java + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + + + io.grpc:grpc-netty + io.grpc + grpc-netty + 1.45.0 + https://github.com/grpc/grpc-java + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + + + io.grpc:grpc-protobuf + io.grpc + grpc-protobuf + 1.45.0 + https://github.com/grpc/grpc-java + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + + + io.grpc:grpc-protobuf-lite + io.grpc + grpc-protobuf-lite + 1.34.1 + https://github.com/grpc/grpc-java + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + + + io.grpc:grpc-stub + io.grpc + grpc-stub + 1.45.0 + https://github.com/grpc/grpc-java + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + istack common utility code runtime com.sun.istack @@ -865,6 +995,136 @@ + + Netty/Buffer + io.netty + netty-buffer + 4.1.77.Final + https://netty.io/netty-buffer/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Codec + io.netty + netty-codec + 4.1.77.Final + https://netty.io/netty-codec/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Codec/HTTP + io.netty + netty-codec-http + 4.1.77.Final + https://netty.io/netty-codec-http/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Codec/HTTP2 + io.netty + netty-codec-http2 + 4.1.77.Final + https://netty.io/netty-codec-http2/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Codec/Socks + io.netty + netty-codec-socks + 4.1.77.Final + https://netty.io/netty-codec-socks/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Common + io.netty + netty-common + 4.1.77.Final + https://netty.io/netty-common/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Handler + io.netty + netty-handler + 4.1.77.Final + https://netty.io/netty-handler/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Handler/Proxy + io.netty + netty-handler-proxy + 4.1.77.Final + https://netty.io/netty-handler-proxy/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Resolver + io.netty + netty-resolver + 4.1.77.Final + https://netty.io/netty-resolver/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + + + Netty/Transport + io.netty + netty-transport + 4.1.77.Final + https://netty.io/netty-transport/ + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + + + okhttp com.squareup.okhttp3 @@ -891,6 +1151,19 @@ + + perfmark:perfmark-api + io.perfmark + perfmark-api + 0.23.0 + https://github.com/perfmark/perfmark + + + Apache 2.0 + https://opensource.org/licenses/Apache-2.0 + + + PostgreSQL JDBC Driver org.postgresql @@ -943,6 +1216,32 @@ + + proto-google-common-protos + com.google.api.grpc + proto-google-common-protos + 2.0.1 + https://github.com/googleapis/java-iam/proto-google-common-protos + + + Apache-2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + + + + + Protocol Buffers [Core] + com.google.protobuf + protobuf-java + 3.19.4 + https://developers.google.com/protocol-buffers/protobuf-java/ + + + 3-Clause BSD License + https://opensource.org/licenses/BSD-3-Clause + + + reload4j ch.qos.reload4j diff --git a/castor-service/pom.xml b/castor-service/pom.xml index bdf205c..48aec74 100644 --- a/castor-service/pom.xml +++ b/castor-service/pom.xml @@ -49,6 +49,8 @@ Hoxton.SR12 2.7.8 1.3.5.RELEASE + 1.45.0 + 3.19.4 1.17.6 @@ -90,6 +92,27 @@ + + io.grpc + grpc-netty + 1.45.0 + + + io.grpc + grpc-protobuf + 1.45.0 + + + io.grpc + grpc-stub + 1.45.0 + + + com.google.protobuf + protobuf-java + 3.19.4 + + com.fasterxml.jackson.core @@ -290,6 +313,13 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + ${project.basedir}/.. @@ -315,6 +345,24 @@ + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.19.4:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.45.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + org.codehaus.mojo license-maven-plugin diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/config/CastorServiceProperties.java b/castor-service/src/main/java/io/carbynestack/castor/service/config/CastorServiceProperties.java index 1edba00..9894c1d 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/config/CastorServiceProperties.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/config/CastorServiceProperties.java @@ -29,6 +29,7 @@ public class CastorServiceProperties { private boolean noSslValidation = false; private List trustedCertificates = new ArrayList<>(); private int initialFragmentSize; + private String podHash; private CastorCacheProperties cache; private CastorSlaveServiceProperties slave; diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadService.java b/castor-service/src/main/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadService.java index 9dea76d..1b3e421 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadService.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadService.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -14,9 +14,11 @@ import io.carbynestack.castor.service.persistence.cache.ReservationCachingService; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentStorageService; import io.carbynestack.castor.service.persistence.tuplestore.TupleStore; +import java.io.IOException; +import java.io.InputStream; import java.util.UUID; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -42,6 +44,7 @@ public DefaultTuplesDownloadService( this.fragmentStorageService = fragmentStorageService; this.reservationCachingService = reservationCachingService; this.castorServiceProperties = castorServiceProperties; + // this.fragmentStorageService.setUserLevelLockTimeout(3000); } /** @@ -57,14 +60,14 @@ public DefaultTuplesDownloadService( */ @Transactional @Override - public , F extends Field> TupleList getTupleList( + public , F extends Field> byte[] getTupleList( Class tupleCls, F field, long count, UUID requestId) { TupleType tupleType = TupleType.findTupleType(tupleCls, field); String reservationId = requestId + "_" + tupleType; Reservation reservation; if (castorServiceProperties.isMaster()) { reservation = - reservationCachingService.getUnlockedReservation(reservationId, tupleType, count); + reservationCachingService.lockAndRetrieveReservation(reservationId, tupleType, count); if (reservation == null) { reservation = reservationCachingService.createReservation(reservationId, tupleType, count); log.debug("Reservation successfully activated on all slaves."); @@ -72,42 +75,63 @@ public , F extends Field> TupleList getTupleList( } else { reservation = reservationCachingService.getReservationWithRetry(reservationId, tupleType, count); + ReservationElement firstElement = reservation.getReservations().get(0); + int reservedFragments = + fragmentStorageService.lockReservedFragmentsWithoutRetrieving( + firstElement.getTupleChunkId(), firstElement.getStartIndex(), reservationId); + if (reservedFragments < reservation.getReservations().size()) { + System.err.println( + "Expected: " + reservation.getReservations().size() + " actual: " + reservedFragments); + throw new CastorServiceException(FAILED_RETRIEVING_TUPLES_EXCEPTION_MSG); + } } - TupleList result = consumeReservation(tupleCls, field, reservation); + byte[] result = consumeReservation(tupleCls, field, reservation); deleteReservedFragments(reservation); reservationCachingService.forgetReservation(reservationId); return result; } - /** @throws CastorServiceException if tuples cannot be retrieved from database */ - private , F extends Field> TupleList consumeReservation( + /** + * @throws CastorServiceException if tuples cannot be retrieved from database + */ + private , F extends Field> byte[] consumeReservation( Class tupleCls, F field, Reservation reservation) { - TupleList tuples = new TupleList<>(tupleCls, field); - tuples.addAll( - reservation.getReservations().stream() - .flatMap( - reservationElement -> downloadTuples(tupleCls, field, reservationElement).stream()) - .collect(Collectors.toList())); - return tuples; + + try (ByteArrayOutputStream tupleBytes = new ByteArrayOutputStream()) { + + for (ReservationElement reservationElement : reservation.getReservations()) { + try (InputStream tplData = downloadTuples(tupleCls, field, reservationElement)) { + tupleBytes.write(tplData); + } + } + + return tupleBytes.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - /** @throws CastorServiceException if tuples cannot be retrieved from database */ - private , F extends Field> TupleList downloadTuples( + /** + * @throws CastorServiceException if tuples cannot be retrieved from database + */ + private , F extends Field> InputStream downloadTuples( Class tupleCls, F field, ReservationElement reservationElement) { UUID tupleChunkId = reservationElement.getTupleChunkId(); TupleType tupleType = TupleType.findTupleType(tupleCls, field); final long offset = reservationElement.getStartIndex() * tupleType.getTupleSize(); final long length = reservationElement.getReservedTuples() * tupleType.getTupleSize(); try { - return tupleStore.downloadTuples(tupleCls, field, tupleChunkId, offset, length); + return tupleStore.downloadTuplesAsBytes(tupleCls, field, tupleChunkId, offset, length); } catch (Exception e) { throw new CastorServiceException(FAILED_RETRIEVING_TUPLES_EXCEPTION_MSG, e); } } - /** @throws CastorServiceException if metadata cannot be updated */ + /** + * @throws CastorServiceException if metadata cannot be updated + */ private void deleteReservedFragments(Reservation reservation) { - fragmentStorageService.deleteAllForReservationId(reservation.getReservationId()); + // fragmentStorageService.deleteAllForReservationId(reservation.getReservationId()); for (ReservationElement reservationElement : reservation.getReservations()) { if (!fragmentStorageService.isChunkReferencedByFragments( reservationElement.getTupleChunkId())) { diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/download/TuplesDownloadService.java b/castor-service/src/main/java/io/carbynestack/castor/service/download/TuplesDownloadService.java index 57a4239..70f0a29 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/download/TuplesDownloadService.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/download/TuplesDownloadService.java @@ -10,7 +10,6 @@ import io.carbynestack.castor.common.entities.Field; import io.carbynestack.castor.common.entities.Reservation; import io.carbynestack.castor.common.entities.Tuple; -import io.carbynestack.castor.common.entities.TupleList; import io.carbynestack.castor.common.exceptions.CastorServiceException; import io.carbynestack.castor.service.config.CastorSlaveServiceProperties; import java.util.UUID; @@ -73,11 +72,11 @@ public interface TuplesDownloadService { * @throws CastorServiceException if communication with slaves failed * @throws CastorServiceException if no reservation could be made for the given configuration * @throws CastorServiceException if no {@link Reservation} with the given id could be obtained - * within a defined timout (see {@link - * CastorSlaveServiceProperties#getWaitForReservationTimeout()}). + * within a defined timout (see {@link + * CastorSlaveServiceProperties#getWaitForReservationTimeout()}). * @throws CastorServiceException if tuples cannot be retrieved from database */ @Transactional - , F extends Field> TupleList getTupleList( + , F extends Field> byte[] getTupleList( Class tupleCls, F field, long count, UUID requestId); } diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplier.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplier.java index 40e8628..d5826d1 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplier.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplier.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -7,17 +7,18 @@ package io.carbynestack.castor.service.persistence.cache; -import io.carbynestack.castor.client.download.CastorInterVcpClient; import io.carbynestack.castor.common.entities.Reservation; import io.carbynestack.castor.common.entities.ReservationElement; import io.carbynestack.castor.common.entities.TupleType; import io.carbynestack.castor.common.exceptions.CastorClientException; import io.carbynestack.castor.common.exceptions.CastorServiceException; +import io.carbynestack.castor.service.config.CastorServiceProperties; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentEntity; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentStorageService; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -25,18 +26,14 @@ @Slf4j @AllArgsConstructor public final class CreateReservationSupplier implements Supplier { - public static final String INSUFFICIENT_TUPLES_EXCEPTION_MSG = - "Insufficient Tuples of type %s available (%s out of %s)."; public static final String SHARING_RESERVATION_FAILED_EXCEPTION_MSG = "Sharing reservation with slave services failed."; - public static final String FAILED_RESERVE_TUPLES_EXCEPTION_MSG = "Failed to reserve the tuples."; public static final String FAILED_FETCH_AVAILABLE_FRAGMENT_EXCEPTION_MSG = "Unable to locate available tuples."; - final CastorInterVcpClient castorInterVcpClient; - final ReservationCachingService reservationCache; final TupleChunkFragmentStorageService fragmentStorageService; final String reservationId; final TupleType tupleType; + final CastorServiceProperties castorServiceProperties; final long count; /** @@ -51,55 +48,75 @@ public Reservation get() { List reservationElements = composeElements(tupleType, count, reservationId); log.debug("Reservation composed."); Reservation reservation = new Reservation(reservationId, tupleType, reservationElements); - if (!castorInterVcpClient.shareReservation(reservation)) { - throw new CastorServiceException(SHARING_RESERVATION_FAILED_EXCEPTION_MSG); - } log.debug("Reservation successfully shared with all slaves."); return reservation; } /** + * Composes elements of the reservation by first trying to get as many fragments with the 'initial + * fragment size' as possible and then with fragments that deviate from this. The fragments with + * 'round' size are stored after the fragments with 'non-round' fragments size' + * * @throws CastorServiceException if not enough tuples of the given type are available * @throws CastorServiceException if reserving the requested amount of tuples failed, although * there are enough tuples available */ private List composeElements( TupleType tupleType, long numberOfTuples, String reservationId) { - long availableTuples = fragmentStorageService.getAvailableTuples(tupleType); - if (availableTuples < numberOfTuples) { - throw new CastorServiceException( - String.format( - INSUFFICIENT_TUPLES_EXCEPTION_MSG, tupleType, availableTuples, numberOfTuples)); - } List reservationElements = new ArrayList<>(); - long stillToReserve = numberOfTuples; - while (stillToReserve > 0) { - try { - TupleChunkFragmentEntity availableFragment = - fragmentStorageService - .findAvailableFragmentWithTupleType(tupleType) - .orElseThrow( - () -> - new CastorServiceException(FAILED_FETCH_AVAILABLE_FRAGMENT_EXCEPTION_MSG)); - long tuplesInFragment = availableFragment.getEndIndex() - availableFragment.getStartIndex(); - if (tuplesInFragment > stillToReserve) { - availableFragment = - fragmentStorageService.splitAt( - availableFragment, availableFragment.getStartIndex() + stillToReserve); - } - availableFragment.setReservationId(reservationId); - fragmentStorageService.update(availableFragment); - long tuplesTaken = Math.min(tuplesInFragment, stillToReserve); - stillToReserve -= tuplesTaken; - reservationElements.add( - new ReservationElement( - availableFragment.getTupleChunkId(), - tuplesTaken, - availableFragment.getStartIndex())); - } catch (Exception e) { - throw new CastorServiceException(FAILED_RESERVE_TUPLES_EXCEPTION_MSG, e); + long oddToReserve = numberOfTuples % castorServiceProperties.getInitialFragmentSize(); + long roundToReserve = numberOfTuples - oddToReserve; + List roundElements = null; + if (roundToReserve > 0) { + ArrayList roundFragments = + fragmentStorageService.retrieveAndReserveRoundFragments( + (int) roundToReserve, tupleType, reservationId); + + // if the fragments are split up there might be enough tuples in the non-round fragments. + // -- especially with inputmasks + if (roundFragments.size() * (long) castorServiceProperties.getInitialFragmentSize() + < roundToReserve) + oddToReserve += + roundToReserve + - roundFragments.size() * (long) castorServiceProperties.getInitialFragmentSize(); + + // maps all fragments returned to ReservationElements using their 'tupleChunnkId' attribute, + // the common fragment size and their 'startIndex' attribute + roundElements = + roundFragments.stream() + .map( + frag -> + new ReservationElement( + frag.getTupleChunkId(), + castorServiceProperties.getInitialFragmentSize(), + frag.getStartIndex())) + .collect(Collectors.toList()); + } + while (oddToReserve > 0) { + TupleChunkFragmentEntity availableFragment = + fragmentStorageService + .retrieveSinglePartialFragment( + tupleType, oddToReserve < castorServiceProperties.getInitialFragmentSize()) + .orElseThrow( + () -> new CastorServiceException(FAILED_FETCH_AVAILABLE_FRAGMENT_EXCEPTION_MSG)); + long tuplesInFragment = availableFragment.getEndIndex() - availableFragment.getStartIndex(); + if (tuplesInFragment > oddToReserve) { + availableFragment = + fragmentStorageService.splitAt( + availableFragment, availableFragment.getStartIndex() + oddToReserve); } + availableFragment.setReservationId(reservationId); + availableFragment.setRound(false); + fragmentStorageService.update(availableFragment); + long tuplesTaken = Math.min(tuplesInFragment, oddToReserve); + oddToReserve -= tuplesTaken; + ReservationElement tempResElement = + new ReservationElement( + availableFragment.getTupleChunkId(), tuplesTaken, availableFragment.getStartIndex()); + reservationElements.add(tempResElement); } + if (roundElements != null) reservationElements.addAll(roundElements); + log.debug("Composed reservation of {} {}: {}.", numberOfTuples, tupleType, reservationElements); return reservationElements; } diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingService.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingService.java index 2697c24..1e422cf 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingService.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingService.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -20,12 +20,13 @@ import io.carbynestack.castor.service.download.DedicatedTransactionService; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentEntity; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentStorageService; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.cache.CacheKeyPrefix; import org.springframework.data.redis.core.RedisTemplate; @@ -52,6 +53,12 @@ public class ReservationCachingService { public static final String FAILED_CREATE_RESERVATION_EXCEPTION_MSG = "Creating the reservation failed."; + public static final String FAILED_LOCKING_RESERVATION_EXCEPTION_FORMAT = + "Acquiring an exclusive lock on reservation %s failed."; + + public static final String SHARING_RESERVATION_FAILED_EXCEPTION_MSG = + "Sharing reservation with slave services failed."; + private final ConsumptionCachingService consumptionCachingService; private final RedisTemplate redisTemplate; private final TupleChunkFragmentStorageService tupleChunkFragmentStorageService; @@ -59,6 +66,7 @@ public class ReservationCachingService { private final CastorSlaveServiceProperties slaveServiceProperties; private final Optional castorInterVcpClientOptional; private final Optional dedicatedTransactionServiceOptional; + private final CastorServiceProperties castorServiceProperties; protected ExecutorService executorService = Executors.newCachedThreadPool(); @@ -69,6 +77,7 @@ public ReservationCachingService( RedisTemplate redisTemplate, TupleChunkFragmentStorageService tupleChunkFragmentStorageService, CastorSlaveServiceProperties slaveServiceProperties, + CastorServiceProperties castorServiceProperties, Optional castorInterVcpClientOptional, Optional dedicatedTransactionServiceOptional) { this.consumptionCachingService = consumptionCachingService; @@ -78,6 +87,7 @@ public ReservationCachingService( this.slaveServiceProperties = slaveServiceProperties; this.castorInterVcpClientOptional = castorInterVcpClientOptional; this.dedicatedTransactionServiceOptional = dedicatedTransactionServiceOptional; + this.castorServiceProperties = castorServiceProperties; } /** @@ -117,26 +127,62 @@ public void keepReservation(Reservation reservation) { * @throws CastorServiceException if the tuples could not be reserved as requested. * @throws CastorServiceException if the cache already holds a reservation with the given ID */ - public void keepAndApplyReservation(Reservation reservation) { + @Transactional + public void keepAndApplyReservation(Reservation reservation) throws InterruptedException { log.debug("persisting reservation {}", reservation); ValueOperations ops = redisTemplate.opsForValue(); if (ops.get(cachePrefix + reservation.getReservationId()) == null) { - ops.set(cachePrefix + reservation.getReservationId(), reservation); - log.debug("put in database at {}", cachePrefix + reservation.getReservationId()); log.debug("Apply reservation {}", reservation); - for (ReservationElement re : reservation.getReservations()) { - log.debug("Processing reservation element {}", re); - long startIndex = re.getStartIndex(); - long endIndex = startIndex + re.getReservedTuples(); - while (startIndex < endIndex) { - TupleChunkFragmentEntity fragment = - tupleChunkFragmentStorageService - .findAvailableFragmentForChunkContainingIndex(re.getTupleChunkId(), startIndex) - .orElseThrow( - () -> - new CastorServiceException( - String.format( - RESERVATION_CANNOT_BE_SATISFIED_EXCEPTION_FORMAT, reservation))); + storeReservationInDB(reservation); + ops.set(cachePrefix + reservation.getReservationId(), reservation); + applyConsumption(reservation); + log.debug("consumption emitted"); + } else { + System.err.println("reservation conflict"); + throw new CastorServiceException( + String.format(RESERVATION_CONFLICT_EXCEPTION_MSG, reservation.getReservationId())); + } + } + + /** + * Maps the reservation elements to tuplefragments and saves them accordingly in batches to the + * RDBMS. This function assumes that all 'non-round' fragments are saved before 'round' fragments + * for faster processing. + * + * @param reservation The reserved fragments + * @throws InterruptedException : If the fragments can not be reserved + */ + @Transactional() + public void storeReservationInDB(@NotNull Reservation reservation) throws InterruptedException { + int firstRoundFragment = 0; + for (ReservationElement re : reservation.getReservations()) { + if (re.getReservedTuples() == castorServiceProperties.getInitialFragmentSize()) break; + firstRoundFragment++; + log.debug("Processing fragmented reservation element {}", re); + long startIndex = re.getStartIndex(); + long endIndex = startIndex + re.getReservedTuples(); + while (startIndex < endIndex) { + try { + + TupleChunkFragmentEntity fragment = null; + // the timeout functionality is needed because the fragments might not be unlocked at that + // point in time + int singleRetryMicrosecs = 6000; + while (fragment == null) { + fragment = + tupleChunkFragmentStorageService + .findAvailableFragmentForChunkContainingIndex(re.getTupleChunkId(), startIndex) + .orElse(null); + if (fragment == null) { + singleRetryMicrosecs -= 2000; + TimeUnit.MICROSECONDS.sleep(2000); + if (singleRetryMicrosecs <= 0) { + System.out.println("Tuples not yet activated"); + throw new CastorServiceException( + String.format(RESERVATION_CANNOT_BE_SATISFIED_EXCEPTION_FORMAT, reservation)); + } + } + } if (fragment.getStartIndex() < startIndex) { fragment = tupleChunkFragmentStorageService.splitBefore(fragment, startIndex); } @@ -144,15 +190,73 @@ public void keepAndApplyReservation(Reservation reservation) { fragment = tupleChunkFragmentStorageService.splitAt(fragment, endIndex); } fragment.setReservationId(reservation.getReservationId()); + fragment.setRound(false); tupleChunkFragmentStorageService.update(fragment); startIndex = fragment.getEndIndex(); + + } catch (Exception e) { + System.err.println( + "Reservation of single fragment failed; probably locking issue; res:" + + reservation.getReservationId() + + " tchunk:" + + re.getTupleChunkId()); + throw e; + } + } + } + if (firstRoundFragment < reservation.getReservations().size()) { + + HashMap> mappedReservations = new HashMap<>(); + List roundReservationElements = + reservation + .getReservations() + .subList(firstRoundFragment, reservation.getReservations().size()); + roundReservationElements.forEach( + resElem -> { + mappedReservations.computeIfAbsent(resElem.getTupleChunkId(), k -> new ArrayList<>()); + mappedReservations.get(resElem.getTupleChunkId()).add(resElem.getStartIndex()); + }); + + for (UUID tChunkId : mappedReservations.keySet()) { + int retryMillisecs = 1500; + while (true) { + try { + int actuallyReservedFragments = + tupleChunkFragmentStorageService.reserveRoundFragmentsByIndices( + mappedReservations.get(tChunkId), reservation.getReservationId(), tChunkId); + if (actuallyReservedFragments == 0) { + retryMillisecs -= 30; + if (retryMillisecs <= 0) { + System.err.println( + "Reservation returned false result. expected: " + + roundReservationElements.size() + + "; actual: " + + actuallyReservedFragments); + throw new CastorServiceException( + String.format(RESERVATION_CANNOT_BE_SATISFIED_EXCEPTION_FORMAT, reservation)); + } + TimeUnit.MILLISECONDS.sleep(30); + } else if (actuallyReservedFragments != mappedReservations.get(tChunkId).size()) { + System.err.println( + "Fragments locked up in an unexpected way in the fragmentstore; expected:" + + mappedReservations.get(tChunkId) + + "; actual: " + + actuallyReservedFragments + + "; tchunkId: " + + tChunkId.toString()); + throw new CastorServiceException( + "Reserving fragments failed due to unexpected locks in fragment table."); + } else { + break; + } + } catch (Exception e) { + System.err.println( + "Reserving round fragments failed; probably locking issue; tchunk: " + + tChunkId.toString()); + throw new CastorServiceException("SHARING FAILED", e.getCause()); + } } } - applyConsumption(reservation); - log.debug("consumption emitted"); - } else { - throw new CastorServiceException( - String.format(RESERVATION_CONFLICT_EXCEPTION_MSG, reservation.getReservationId())); } } @@ -211,19 +315,24 @@ public Reservation createReservation(String reservationId, TupleType tupleType, .get() .runAsNewTransaction( new CreateReservationSupplier( - castorInterVcpClientOptional.get(), - this, tupleChunkFragmentStorageService, reservationId, tupleType, + castorServiceProperties, count)); keepReservation(reservation); - reservation = - dedicatedTransactionServiceOptional - .get() - .runAsNewTransaction( - new UnlockReservationSupplier( - castorInterVcpClientOptional.get(), this, reservation)); + if (!castorInterVcpClientOptional.get().shareReservation(reservation)) { + throw new CastorServiceException(SHARING_RESERVATION_FAILED_EXCEPTION_MSG); + } + ReservationElement firstElement = reservation.getReservations().get(0); + int retrievedFragmentsReservation = + tupleChunkFragmentStorageService.lockReservedFragmentsWithoutRetrieving( + firstElement.getTupleChunkId(), firstElement.getStartIndex(), reservationId); + if (retrievedFragmentsReservation != reservation.getReservations().size()) { + throw new CastorServiceException( + String.format( + FAILED_LOCKING_RESERVATION_EXCEPTION_FORMAT, reservation.getReservationId())); + } } catch (CastorClientException cce) { throw new CastorServiceException(FAILED_CREATE_RESERVATION_EXCEPTION_MSG, cce); } @@ -302,6 +411,60 @@ public Reservation getUnlockedReservation(String reservationId, TupleType tupleT return reservation; } + @Transactional + public Reservation lockAndRetrieveReservation( + String reservationId, TupleType tupleType, long count) { + ValueOperations ops = redisTemplate.opsForValue(); + Reservation reservation = (Reservation) ops.get(cachePrefix + reservationId); + if (reservation != null) { + long expectedLockedFragments = + count % castorServiceProperties.getInitialFragmentSize() == 0 + ? count / castorServiceProperties.getInitialFragmentSize() + : count / castorServiceProperties.getInitialFragmentSize() + 1; + ReservationElement firstElement = reservation.getReservations().get(0); + int lockedFragments = + tupleChunkFragmentStorageService.lockReservedFragmentsWithoutRetrieving( + firstElement.getTupleChunkId(), firstElement.getStartIndex(), reservationId); + if (lockedFragments != expectedLockedFragments) return null; + if (reservation.getTupleType() != tupleType + || reservation.getReservations().stream() + .mapToLong(ReservationElement::getReservedTuples) + .sum() + != count) { + throw new CastorServiceException( + String.format( + RESERVATION_DOES_NOT_MATCH_SPECIFICATION_EXCEPTION_MSG, + reservationId, + tupleType, + count, + reservation)); + } + } + return reservation; + } + + @Transactional + public Reservation retrieveReservation(String reservationId, TupleType tupleType, long count) { + ValueOperations ops = redisTemplate.opsForValue(); + Reservation reservation = (Reservation) ops.get(cachePrefix + reservationId); + if (reservation != null) { + if (reservation.getTupleType() != tupleType + || reservation.getReservations().stream() + .mapToLong(ReservationElement::getReservedTuples) + .sum() + != count) { + throw new CastorServiceException( + String.format( + RESERVATION_DOES_NOT_MATCH_SPECIFICATION_EXCEPTION_MSG, + reservationId, + tupleType, + count, + reservation)); + } + } + return reservation; + } + @Transactional public void forgetReservation(String reservationId) { redisTemplate.delete(cachePrefix + reservationId); diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/UnlockReservationSupplier.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/UnlockReservationSupplier.java index 085eebf..29ef0e2 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/UnlockReservationSupplier.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/UnlockReservationSupplier.java @@ -13,6 +13,8 @@ import io.carbynestack.castor.common.exceptions.CastorClientException; import io.carbynestack.castor.common.exceptions.CastorServiceException; import java.util.function.Supplier; + +import io.micrometer.core.annotation.Timed; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallable.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallable.java index 78b7218..dc98b36 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallable.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallable.java @@ -39,7 +39,7 @@ public Reservation call() { while (!this.stop || Thread.currentThread().isInterrupted()) { try { Reservation reservation = - reservationCachingService.getUnlockedReservation( + reservationCachingService.retrieveReservation( this.reservationId, tupleType, numberOfTuples); if (reservation != null) { return reservation; diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntity.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntity.java index 42050d9..bc2613b 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntity.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntity.java @@ -62,6 +62,11 @@ public class TupleChunkFragmentEntity implements Serializable { static final String FRAGMENT_LENGTH_COLUMN = "fragment_length"; static final String ACTIVATION_STATUS_COLUMN = "activation_status"; static final String RESERVATION_ID_COLUMN = "reservation_id"; + static final String VIEW_NAME = "distributed_fragments"; + static final String POD_HASH_FIELD = "pod_hash"; + static final String POD_HASH = System.getenv("HOSTNAME"); + static final String TUPLECHUNK_STARTINDEX_INDEX_NAME = "chunk_startidx_idx"; + static final String IS_ROUND_COLUMN = "is_round"; @Transient public static final String STATUS_COLUMN = "status"; @@ -118,6 +123,10 @@ public class TupleChunkFragmentEntity implements Serializable { @Column(name = RESERVATION_ID_COLUMN) private String reservationId; + @Setter + @Column(name = IS_ROUND_COLUMN, columnDefinition = "BOOLEAN DEFAULT true") + private boolean isRound; + /** To be used by deserialization only */ protected TupleChunkFragmentEntity() { this.tupleChunkId = null; @@ -132,13 +141,15 @@ private TupleChunkFragmentEntity( long startIndex, long endIndex, ActivationStatus activationStatus, - String reservationId) { + String reservationId, + boolean isRound) { this.tupleChunkId = tupleChunkId; this.tupleType = tupleType; this.startIndex = startIndex; this.endIndex = endIndex; this.activationStatus = activationStatus; this.reservationId = reservationId; + this.isRound = isRound; } /** @@ -173,7 +184,7 @@ public TupleChunkFragmentEntity setEndIndex(long endIndex) { */ public static TupleChunkFragmentEntity of( UUID tupleChunkId, TupleType tupleType, long startIndex, long endIndex) { - return of(tupleChunkId, tupleType, startIndex, endIndex, ActivationStatus.LOCKED, null); + return of(tupleChunkId, tupleType, startIndex, endIndex, ActivationStatus.LOCKED, null, true); } /** @@ -192,6 +203,7 @@ public static TupleChunkFragmentEntity of( * consumption ({@link ActivationStatus#UNLOCKED}) or not ({@link ActivationStatus#LOCKED}). * @param reservationId The unique reservation identifier for the operation the new {@link * TupleChunkFragmentEntity} is assigned to. Setting to null indi + * @param isRound * @return a new {@link TupleChunkFragmentEntity} created with the given configuration */ public static TupleChunkFragmentEntity of( @@ -200,16 +212,19 @@ public static TupleChunkFragmentEntity of( long startIndex, long endIndex, ActivationStatus activationStatus, - String reservationId) { + String reservationId, + boolean isRound) { Assert.notNull(tupleChunkId, ID_MUST_NOT_BE_NULL_EXCEPTION_MSG); Assert.notNull(tupleType, TUPLE_TYPE_MUST_NOT_BE_NULL_EXCEPTION_MSG); verifyStartIndex(startIndex); verifyEndIndex(startIndex, endIndex); return new TupleChunkFragmentEntity( - tupleChunkId, tupleType, startIndex, endIndex, activationStatus, reservationId); + tupleChunkId, tupleType, startIndex, endIndex, activationStatus, reservationId, isRound); } - /** @throws IllegalArgumentException if startIndex < 0. */ + /** + * @throws IllegalArgumentException if startIndex < 0. + */ private static void verifyStartIndex(long startIndex) { if (startIndex < 0) { throw new IllegalArgumentException( @@ -217,7 +232,9 @@ private static void verifyStartIndex(long startIndex) { } } - /** @throws IllegalArgumentException if endIndex < startIndex. */ + /** + * @throws IllegalArgumentException if endIndex < startIndex. + */ private static void verifyEndIndex(long startIndex, long endIndex) { if (endIndex < startIndex) { throw new IllegalArgumentException( diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentRepository.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentRepository.java index 9f07123..4aff537 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentRepository.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentRepository.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -11,6 +11,7 @@ import io.carbynestack.castor.common.entities.ActivationStatus; import io.carbynestack.castor.common.entities.TupleType; +import java.util.ArrayList; import java.util.Optional; import java.util.UUID; import javax.persistence.LockModeType; @@ -42,9 +43,6 @@ public interface TupleChunkFragmentRepository + TUPLE_CHUNK_ID_FIELD + "=:tupleChunkId " + "AND " - + ACTIVATION_STATUS_FIELD - + "='UNLOCKED' " - + "AND " + RESERVATION_ID_FIELD + " IS NULL " + "AND " @@ -53,6 +51,9 @@ public interface TupleChunkFragmentRepository + "AND " + END_INDEX_FIELD + ">:startIndex" + + " AND " + + ACTIVATION_STATUS_FIELD + + "='UNLOCKED' " + " ORDER BY " + START_INDEX_FIELD + " DESC") @@ -90,15 +91,13 @@ Optional findFirstFragmentContainingAnyTupleOfSequence + " FROM " + CLASS_NAME + " WHERE " - + ACTIVATION_STATUS_FIELD - + "=io.carbynestack.castor.common.entities.ActivationStatus.UNLOCKED " - + "AND " + RESERVATION_ID_FIELD - + " IS NULL " - + "AND " + + " IS NULL AND " + + ACTIVATION_STATUS_FIELD + + "=io.carbynestack.castor.common.entities.ActivationStatus.UNLOCKED AND " + TUPLE_TYPE_FIELD + "=:tupleType") - long getAvailableTupleByType(@Param("tupleType") TupleType type); + long getAvailableTuplesByType(@Param("tupleType") TupleType type); @Transactional @Modifying @@ -114,6 +113,155 @@ Optional findFirstFragmentContainingAnyTupleOfSequence nativeQuery = true) int unlockAllForTupleChunk(@Param("tupleChunkId") UUID tupleChunkId); + @Transactional + @Query( + value = + " UPDATE " + + TABLE_NAME + + " SET " + + RESERVATION_ID_COLUMN + + " = :reservationId" + + " WHERE " + + FRAGMENT_ID_COLUMN + + " IN (SELECT " + + FRAGMENT_ID_COLUMN + + " FROM " + + TABLE_NAME + + " WHERE " + + TUPLE_TYPE_COLUMN + + " = :tupleType AND " + + RESERVATION_ID_COLUMN + + " is NULL AND " + + ACTIVATION_STATUS_COLUMN + + " = 'UNLOCKED' AND " + + IS_ROUND_COLUMN + + " FOR UPDATE SKIP LOCKED LIMIT :amount) RETURNING *", + nativeQuery = true) + ArrayList retrieveAndReserveRoundFragmentsByType( + @Param("tupleType") String tupleType, + @Param("amount") int amount, + @Param("reservationId") String reservationId); + + @Transactional + @Modifying + @Query( + value = + "UPDATE " + + TABLE_NAME + + " SET " + + RESERVATION_ID_COLUMN + + " = :reservationId WHERE " + + START_INDEX_COLUMN + + " IN :indices AND " + + IS_ROUND_COLUMN + + " AND " + + RESERVATION_ID_COLUMN + + " is NULL AND " + + ACTIVATION_STATUS_COLUMN + + " = 'UNLOCKED' AND " + + TUPLE_CHUNK_ID_COLUMN + + " = :tupleChunkId", + nativeQuery = true) + int reserveRoundFragmentsByIndices( + @Param("indices") ArrayList indices, + @Param("reservationId") String reservationId, + @Param("tupleChunkId") UUID tupleChunkId); + + @Transactional + @Query( + value = + "SELECT * FROM " + + TABLE_NAME + + " WHERE " + + TUPLE_TYPE_COLUMN + + " = :tupleType AND " + + ACTIVATION_STATUS_COLUMN + + " = 'UNLOCKED' AND " + + RESERVATION_ID_COLUMN + + " is NULL ORDER BY " + + IS_ROUND_COLUMN + + ", " + + START_INDEX_COLUMN + + " - " + + END_INDEX_COLUMN + + " ASC FOR UPDATE SKIP LOCKED LIMIT 1", + nativeQuery = true) + Optional retrieveSinglePartialFragmentPreferSmall( + @Param("tupleType") String tupleType); + + @Transactional + @Query( + value = + "SELECT * FROM " + + TABLE_NAME + + " WHERE " + + TUPLE_TYPE_COLUMN + + " = :tupleType AND " + + ACTIVATION_STATUS_COLUMN + + " = 'UNLOCKED' AND " + + RESERVATION_ID_COLUMN + + " is NULL ORDER BY " + + IS_ROUND_COLUMN + + ", " + + START_INDEX_COLUMN + + " - " + + END_INDEX_COLUMN + + " DESC FOR UPDATE SKIP LOCKED LIMIT 1", + nativeQuery = true) + Optional retrieveSinglePartialFragmentPreferBig( + @Param("tupleType") String tupleType); + + @Transactional + @Query( + value = + " UPDATE " + + TABLE_NAME + + " SET " + + RESERVATION_ID_COLUMN + + " = :reservationId" + + " WHERE " + + FRAGMENT_ID_COLUMN + + " IN (SELECT " + + FRAGMENT_ID_COLUMN + + " FROM " + + TABLE_NAME + + " WHERE " + + TUPLE_TYPE_COLUMN + + " = :tupleType AND " + + IS_ROUND_COLUMN + + " AND " + + RESERVATION_ID_COLUMN + + " is NULL FOR UPDATE SKIP LOCKED LIMIT :amount) RETURNING *", + nativeQuery = true) + ArrayList test( + @Param("tupleType") String tupleType, + @Param("amount") int amount, + @Param("reservationId") String reservationId); + + @Transactional + @Query( + value = + "DELETE FROM " + + TABLE_NAME + + " WHERE " + + TUPLE_CHUNK_ID_COLUMN + + " = :tupleChunkId AND " + + START_INDEX_COLUMN + + " <= :startIdx AND " + + END_INDEX_COLUMN + + " > :startIdx RETURNING " + + RESERVATION_ID_COLUMN, + nativeQuery = true) + String lockFirstFragmentReturningReservationId( + @Param("tupleChunkId") UUID tupleChunkId, @Param("startIdx") long startIdx); + + @Transactional + @Modifying + @Query( + value = "DELETE FROM " + TABLE_NAME + " WHERE " + RESERVATION_ID_COLUMN + " = :reservationId", + nativeQuery = true) + int lockRemainingTuplesWithoutRetrieving(@Param("reservationId") String reservationId); + @Transactional void deleteAllByReservationId(String reservationId); diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageService.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageService.java index 0d8f66c..2267da3 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageService.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageService.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -10,9 +10,8 @@ import io.carbynestack.castor.common.entities.*; import io.carbynestack.castor.common.exceptions.CastorClientException; import io.carbynestack.castor.common.exceptions.CastorServiceException; -import java.util.List; -import java.util.Optional; -import java.util.UUID; +import io.carbynestack.castor.service.config.CastorServiceProperties; +import java.util.*; import javax.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -32,6 +31,8 @@ public class TupleChunkFragmentStorageService { private final TupleChunkFragmentRepository fragmentRepository; + private final CastorServiceProperties castorServiceProperties; + /** * Creates and stores an {@link TupleChunkFragmentEntity} object with the given information in the * database @@ -90,6 +91,38 @@ public Optional findAvailableFragmentForChunkContainin tupleChunkId, startIndex); } + @Transactional + public ArrayList retrieveAndReserveRoundFragments( + int amount, TupleType ttype, String reservationId) { + return fragmentRepository.retrieveAndReserveRoundFragmentsByType( + ttype.name(), amount / castorServiceProperties.getInitialFragmentSize(), reservationId); + } + + public int reserveRoundFragmentsByIndices( + ArrayList indices, String reservationId, UUID tupleChunkId) { + return fragmentRepository.reserveRoundFragmentsByIndices(indices, reservationId, tupleChunkId); + } + + @Transactional + public int lockReservedFragmentsWithoutRetrieving( + UUID tupleChunkId, long startIdx, String reservationId) { + String deletedId = + fragmentRepository.lockFirstFragmentReturningReservationId(tupleChunkId, startIdx); + if (deletedId != null && deletedId.equals(reservationId)) + return 1 + fragmentRepository.lockRemainingTuplesWithoutRetrieving(reservationId); + else + throw new CastorServiceException( + "Reserved Tuples got contended. Conflicting reservation: " + deletedId); + } + + @Transactional + public Optional retrieveSinglePartialFragment( + TupleType tupleType, boolean preferSmall) { + if (preferSmall) + return fragmentRepository.retrieveSinglePartialFragmentPreferSmall(tupleType.name()); + else return fragmentRepository.retrieveSinglePartialFragmentPreferBig(tupleType.name()); + } + /** * Gets a {@link TupleChunkFragmentEntity} that meets the following criteria: * @@ -139,7 +172,8 @@ public TupleChunkFragmentEntity splitAt(TupleChunkFragmentEntity fragment, long index, fragment.getEndIndex(), fragment.getActivationStatus(), - fragment.getReservationId()); + fragment.getReservationId(), + false); fragmentRepository.save(nf); fragment.setEndIndex(index); log.debug( @@ -181,7 +215,8 @@ public TupleChunkFragmentEntity splitBefore(TupleChunkFragmentEntity fragment, l index, fragment.getEndIndex(), fragment.getActivationStatus(), - fragment.getReservationId()); + fragment.getReservationId(), + false); fragment.setEndIndex(index); fragmentRepository.save(fragment); log.debug( @@ -224,7 +259,7 @@ public void checkNoConflict(UUID chunkId, long startIndex, long endIndex) { */ public long getAvailableTuples(TupleType type) { try { - return fragmentRepository.getAvailableTupleByType(type); + return fragmentRepository.getAvailableTuplesByType(type); } catch (Exception e) { log.debug( String.format( @@ -249,16 +284,6 @@ public void activateFragmentsForTupleChunk(UUID chunkId) { } } - /** - * Removes all {@link TupleChunkFragmentEntity fragments} associated with the given reservation - * id. - * - * @param reservationId the unique identifier of the reservation. - */ - public void deleteAllForReservationId(String reservationId) { - fragmentRepository.deleteAllByReservationId(reservationId); - } - /** * Indicates whether any {@link TupleChunkFragmentEntity fragment} is associated with the given * tuple chunk id. diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/MinioTupleStore.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/MinioTupleStore.java index a405442..30e421e 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/MinioTupleStore.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/MinioTupleStore.java @@ -12,6 +12,7 @@ import io.carbynestack.castor.common.entities.TupleList; import io.carbynestack.castor.common.exceptions.CastorServiceException; import io.carbynestack.castor.service.config.MinioProperties; +import io.micrometer.core.annotation.Timed; import io.minio.*; import io.minio.errors.*; import java.io.ByteArrayInputStream; @@ -22,6 +23,8 @@ import java.util.UUID; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.Assert; @@ -118,6 +121,35 @@ public , F extends Field> TupleList downloadTuples( } } + @Override + public , F extends Field> InputStream downloadTuplesAsBytes( + @NonNull Class tupleCls, + @NonNull F fieldType, + @NonNull UUID tupleChunkId, + long startIndex, + long lengthToRead) { + Assert.isTrue(startIndex >= 0, INVALID_INDEX_EXCEPTION_MSG); + Assert.isTrue(lengthToRead >= 1, INVALID_LENGTH_EXCEPTION_MSG); + log.debug( + "Starting download from S3 for key {} from byte {} to byte {} into byte array", + tupleChunkId, + startIndex, + startIndex + lengthToRead); + try{ + + return minioClient.getObject( + GetObjectArgs.builder() + .bucket(minioProperties.getBucket()) + .object(tupleChunkId.toString()) + .offset(startIndex) + .length(lengthToRead) + .build()); + } catch (Exception e) { + log.error("Exception occurred while reading tuple data from Minio.", e); + throw new CastorServiceException(ERROR_WHILE_READING_TUPLES_EXCEPTION_MSG, e); + } + } + @Override public void deleteTupleChunk(UUID id) { try { diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/TupleStore.java b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/TupleStore.java index 61c4217..2a29604 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/TupleStore.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/persistence/tuplestore/TupleStore.java @@ -12,6 +12,8 @@ import io.carbynestack.castor.common.entities.TupleChunk; import io.carbynestack.castor.common.entities.TupleList; import io.carbynestack.castor.common.exceptions.CastorServiceException; + +import java.io.InputStream; import java.util.UUID; public interface TupleStore { @@ -43,6 +45,22 @@ public interface TupleStore { , F extends Field> TupleList downloadTuples( Class tupleCls, F fieldType, UUID tupleChunkId, long startIndex, long lengthToRead); + /** + * Downloads Tuples of Type T and Field F from a chunk in the store and returns them as a byte Array. + * + * @param tupleCls Class of tuples that sould be downloaded + * @param fieldType The filedType of the requested tuples + * @param tupleChunkId Identifies, from which chunk to download + * @param startIndex the index of the first byte to read (inclusive) + * @param lengthToRead the number of bytes to read from the referenced chunk + * @return all multiplication triples in the specified range + * @throws CastorServiceException if some error occurs while reading data stream from store + * @throws IllegalArgumentException if given index of first requested tuple is negative. + * @throws IllegalArgumentException if less than one tuple is requested. + */ + , F extends Field> InputStream downloadTuplesAsBytes( + Class tupleCls, F fieldType, UUID tupleChunkId, long startIndex, long lengthToRead); + /** * Deletes an object from the store * diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/rest/ReservationRestController.java b/castor-service/src/main/java/io/carbynestack/castor/service/rest/ReservationRestController.java index 28cf465..220c18e 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/rest/ReservationRestController.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/rest/ReservationRestController.java @@ -27,7 +27,8 @@ public class ReservationRestController { private final ReservationCachingService reservationCachingService; @PostMapping - public ResponseEntity reserveTuples(@RequestBody Reservation reservation) { + public ResponseEntity reserveTuples(@RequestBody Reservation reservation) + throws InterruptedException { reservation.setStatus(ActivationStatus.LOCKED); reservationCachingService.keepAndApplyReservation(reservation); return new ResponseEntity<>(HttpStatus.CREATED); diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/rest/TuplesController.java b/castor-service/src/main/java/io/carbynestack/castor/service/rest/TuplesController.java index ab8b334..c632212 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/rest/TuplesController.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/rest/TuplesController.java @@ -37,7 +37,7 @@ public class TuplesController { * same tuples. */ @GetMapping - public ResponseEntity getTuples( + public ResponseEntity getTuples( @RequestParam(value = DOWNLOAD_TUPLE_TYPE_PARAMETER) String type, @RequestParam(value = DOWNLOAD_COUNT_PARAMETER) long count, @RequestParam(value = DOWNLOAD_REQUEST_ID_PARAMETER) UUID requestId) { diff --git a/castor-service/src/main/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketService.java b/castor-service/src/main/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketService.java index d6fa5d7..b97a80a 100644 --- a/castor-service/src/main/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketService.java +++ b/castor-service/src/main/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketService.java @@ -8,6 +8,7 @@ import static io.carbynestack.castor.common.rest.CastorRestApiEndpoints.UPLOAD_TUPLES_ENDPOINT; +import io.carbynestack.castor.common.entities.ActivationStatus; import io.carbynestack.castor.common.entities.TupleChunk; import io.carbynestack.castor.common.exceptions.CastorClientException; import io.carbynestack.castor.common.exceptions.CastorServiceException; @@ -45,8 +46,6 @@ public class DefaultCastorWebSocketService implements CastorWebSocketService { private final TupleChunkFragmentStorageService fragmentStorageService; private final CastorServiceProperties castorServiceProperties; - @Override - @MessageMapping(UPLOAD_TUPLES_ENDPOINT) /** * @throws CastorClientException if the given payload cannot be cast deserialized as {@link * TupleChunk} @@ -54,6 +53,8 @@ public class DefaultCastorWebSocketService implements CastorWebSocketService { * @throws CastorServiceException if writing the received {@link TupleChunk} to the database * failed */ + @Override + @MessageMapping(UPLOAD_TUPLES_ENDPOINT) public void uploadTupleChunk(SimpMessageHeaderAccessor headerAccessor, byte[] payload) { log.debug("Received payload..."); TupleChunk tupleChunk; @@ -95,18 +96,29 @@ public void uploadTupleChunk(SimpMessageHeaderAccessor headerAccessor, byte[] pa */ protected List generateFragmentsForChunk(TupleChunk tupleChunk) { List fragments = new ArrayList<>(); - for (long i = 0; - i * castorServiceProperties.getInitialFragmentSize() < tupleChunk.getNumberOfTuples(); + if (tupleChunk.getNumberOfTuples() <= 0) return fragments; + long i = 0; + for (; + i * castorServiceProperties.getInitialFragmentSize() + < tupleChunk.getNumberOfTuples() - castorServiceProperties.getInitialFragmentSize(); i++) { fragments.add( TupleChunkFragmentEntity.of( tupleChunk.getChunkId(), tupleChunk.getTupleType(), i * castorServiceProperties.getInitialFragmentSize(), - Math.min( - (i + 1) * castorServiceProperties.getInitialFragmentSize(), - tupleChunk.getNumberOfTuples()))); + (i + 1) * castorServiceProperties.getInitialFragmentSize())); } + fragments.add( + TupleChunkFragmentEntity.of( + tupleChunk.getChunkId(), + tupleChunk.getTupleType(), + i * castorServiceProperties.getInitialFragmentSize(), + tupleChunk.getNumberOfTuples(), + ActivationStatus.LOCKED, + null, + tupleChunk.getNumberOfTuples() % castorServiceProperties.getInitialFragmentSize() + == 0)); return fragments; } diff --git a/castor-service/src/main/resources/application.properties b/castor-service/src/main/resources/application.properties index f9faa76..a3ebd6b 100644 --- a/castor-service/src/main/resources/application.properties +++ b/castor-service/src/main/resources/application.properties @@ -25,6 +25,7 @@ carbynestack.castor.clientHeartbeat=10000 carbynestack.castor.messageBuffer=10485760 carbynestack.castor.master=${IS_MASTER:true} carbynestack.castor.noSslValidation=${NO_SSL_VALIDATION:false} +carbynestack.castor.podHash=${HOSTNAME:1ae21f5847-35dv3} # list of trusted certificates comma separated (absolute path) carbynestack.castor.trustedCertificates=${TRUSTED_CERTIFICATES:} # Default size of the fragments an uploaded tuple chunk is split into. diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceAsMasterIT.java b/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceAsMasterIT.java index 6aca1de..7f095d0 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceAsMasterIT.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceAsMasterIT.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -35,9 +35,14 @@ import io.minio.MinioClient; import io.minio.PutObjectArgs; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Objects; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; @@ -113,7 +118,7 @@ public void setUp() throws NoSuchFieldException, IllegalAccessException { } @Test - void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { + void givenSharingReservationFails_whenGetTuples_thenDoNotRollbackReservation() { TupleType requestedTupleType = MULTIPLICATION_TRIPLE_GFP; long requestedNoTuples = 12; UUID requestId = UUID.fromString("a345f933-bf70-4c7a-b6cd-312b55a6ff9c"); @@ -127,8 +132,21 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { fragmentStartIndex, fragmentLength, ActivationStatus.UNLOCKED, - null); + null, + false); + String expectedReservationId = requestId + "_" + requestedTupleType; + + TupleChunkFragmentEntity resultingSplitFragment = + spy( + TupleChunkFragmentEntity.of( + chunkId, + requestedTupleType, + requestedNoTuples, + fragmentLength, + ActivationStatus.UNLOCKED, + null, + false)); ReservationElement expectedReservationElement = new ReservationElement(chunkId, requestedNoTuples, fragmentStartIndex); Reservation expectedReservation = @@ -136,7 +154,10 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { expectedReservationId, requestedTupleType, singletonList(expectedReservationElement)); tupleChunkFragmentRepository.save(existingFragment); + // needs to be set like this because 'id' is a generated sequential value + when(resultingSplitFragment.getId()).thenReturn(existingFragment.getId() + 1); doReturn(false).when(interVcpClientMock).shareReservation(expectedReservation); + existingFragment.setReservationId(expectedReservationId); CastorServiceException actualCSE = assertThrows( @@ -147,17 +168,20 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { requestedTupleType.getField(), requestedNoTuples, requestId)); - + existingFragment.setEndIndex(requestedNoTuples); assertEquals(SHARING_RESERVATION_FAILED_EXCEPTION_MSG, actualCSE.getMessage()); - assertEquals(singletonList(existingFragment), tupleChunkFragmentRepository.findAll()); - assertEquals(0, consumptionCachingService.getConsumptionForTupleType(0, requestedTupleType)); - assertNull(reservationCache.get(expectedReservationId)); + assertTrue( + Arrays.asList(resultingSplitFragment, existingFragment) + .containsAll( + StreamSupport.stream(tupleChunkFragmentRepository.findAll().spliterator(), false) + .collect(Collectors.toList()))); + assertEquals(12, consumptionCachingService.getConsumptionForTupleType(0, requestedTupleType)); + assertEquals(reservationCache.get(expectedReservationId).get(), expectedReservation); } @Test - void - givenRetrievingTuplesFails_whenGetTuples_thenKeepReservationAndReservationMarkerButConsumptionMarkerRemainsUntouched() { + void givenRetrievingTuplesFails_whenGetTuples_thenKeepReservationAndReservationMarker() { UUID requestId = UUID.fromString("a345f933-bf70-4c7a-b6cd-312b55a6ff9c"); UUID chunkId = UUID.fromString("80fbba1b-3da8-4b1e-8a2c-cebd65229fad"); TupleType tupleType = MULTIPLICATION_TRIPLE_GFP; @@ -171,7 +195,8 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { fragmentStartIndex, fragmentEndIndex, ActivationStatus.UNLOCKED, - null); + null, + true); String expectedReservationId = requestId + "_" + tupleType; ReservationElement expectedReservationElement = new ReservationElement(chunkId, count, fragmentStartIndex); @@ -185,7 +210,8 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { fragmentStartIndex, fragmentStartIndex + count, ActivationStatus.UNLOCKED, - expectedReservationId); + expectedReservationId, + true); TupleChunkFragmentEntity expectedNewFragment = TupleChunkFragmentEntity.of( chunkId, @@ -193,7 +219,8 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { fragmentStartIndex + count, fragmentEndIndex, ActivationStatus.UNLOCKED, - null); + null, + true); tupleChunkFragmentRepository.save(existingFragment); @@ -215,7 +242,7 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { TupleChunkFragmentEntityListMatcher.containsAll( expectedNewFragment, expectedReservedFragment)); assertEquals( - expectedReservation.setStatus(ActivationStatus.UNLOCKED), + expectedReservation.setStatus(ActivationStatus.LOCKED), reservationCache.get(expectedReservationId).get()); assertEquals(count, consumptionCachingService.getConsumptionForTupleType(0, tupleType)); verify(tupleStoreSpy, never()).deleteTupleChunk(any(UUID.class)); @@ -238,7 +265,8 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { fragmentStartIndex, fragmentEndIndex, ActivationStatus.UNLOCKED, - null); + null, + true); String expectedReservationId = requestId + "_" + tupleType; ReservationElement expectedReservationElement = new ReservationElement(chunkId, count, fragmentStartIndex); @@ -262,11 +290,11 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { doReturn(true).when(interVcpClientMock).shareReservation(expectedReservation); - TupleList tupleList = + byte[] tupleList = tuplesDownloadService.getTupleList( tupleType.getTupleCls(), tupleType.getField(), count, requestId); - assertEquals( + TupleList actualTupleList = TupleList.fromStream( tupleType.getTupleCls(), tupleType.getField(), @@ -274,8 +302,17 @@ void givenSharingReservationFails_whenGetTuples_thenRollbackReservation() { tupleData, (int) (fragmentStartIndex * tupleType.getTupleSize()), (int) ((fragmentStartIndex + count) * tupleType.getTupleSize())), - count * tupleType.getTupleSize()), - tupleList); + count * tupleType.getTupleSize()); + ByteArrayOutputStream actualTupleData = new ByteArrayOutputStream(); + actualTupleList.forEach( + tuple -> { + try { + ((Tuple) tuple).writeTo(actualTupleData); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + assertArrayEquals(actualTupleData.toByteArray(), tupleList); // no fragments stored -> existing fragment was reserved, consumed and then deleted assertFalse(tupleChunkFragmentRepository.findAll().iterator().hasNext()); diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceTest.java b/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceTest.java index 0dddc93..d137ee6 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceTest.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/download/DefaultTuplesDownloadServiceTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -10,8 +10,7 @@ import static io.carbynestack.castor.common.entities.TupleType.INPUT_MASK_GFP; import static io.carbynestack.castor.service.download.DefaultTuplesDownloadService.FAILED_RETRIEVING_TUPLES_EXCEPTION_MSG; import static java.util.Collections.singletonList; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; import io.carbynestack.castor.common.entities.*; @@ -21,6 +20,7 @@ import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentStorageService; import io.carbynestack.castor.service.persistence.tuplestore.TupleStore; import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.UUID; import lombok.SneakyThrows; import org.apache.commons.lang3.RandomUtils; @@ -71,13 +71,17 @@ void givenTuplesCannotBeRetrieved_whenGetTuplesAsSlave_thenThrowCastorServiceExc when(reservationCachingServiceMock.getReservationWithRetry( resultingReservationId, tupleType, count)) .thenReturn(reservationMock); - when(tupleStoreMock.downloadTuples( + doReturn(1) + .when(tupleChunkFragmentStorageServiceMock) + .lockReservedFragmentsWithoutRetrieving(isA(UUID.class), anyLong(), isA(String.class)); + doThrow(expectedCause) + .when(tupleStoreMock) + .downloadTuplesAsBytes( tupleType.getTupleCls(), tupleType.getField(), chunkId, 0, - count * tupleType.getTupleSize())) - .thenThrow(expectedCause); + count * tupleType.getTupleSize()); CastorServiceException actualCse = assertThrows( @@ -107,24 +111,21 @@ resultingReservationId, tupleType, singletonList(availableReservationElement)) .setStatus(ActivationStatus.UNLOCKED); long expectedTupleDownloadLength = tupleType.getTupleSize() * count; byte[] tupleData = RandomUtils.nextBytes((int) expectedTupleDownloadLength); - TupleList expectedTupleList = - TupleList.fromStream( - tupleType.getTupleCls(), - tupleType.getField(), - new ByteArrayInputStream(tupleData), - tupleData.length); + + InputStream expectedTupleData = new ByteArrayInputStream(tupleData); when(castorServicePropertiesMock.isMaster()).thenReturn(true); when(reservationCachingServiceMock.createReservation(resultingReservationId, tupleType, count)) .thenReturn(availableReservation); - when(tupleStoreMock.downloadTuples( - tupleType.getTupleCls(), tupleType.getField(), chunkId, 0, expectedTupleDownloadLength)) - .thenReturn(expectedTupleList); + doReturn(expectedTupleData) + .when(tupleStoreMock) + .downloadTuplesAsBytes( + tupleType.getTupleCls(), tupleType.getField(), chunkId, 0, expectedTupleDownloadLength); - assertEquals( - expectedTupleList, + byte[] actualTupleData = tuplesDownloadService.getTupleList( - tupleType.getTupleCls(), tupleType.getField(), count, requestId)); + tupleType.getTupleCls(), tupleType.getField(), count, requestId); + assertArrayEquals(tupleData, actualTupleData); verify(tupleStoreMock).deleteTupleChunk(chunkId); verify(reservationCachingServiceMock).forgetReservation(resultingReservationId); diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/ConcurrencyIT.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/ConcurrencyIT.java index b302ef2..7265065 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/ConcurrencyIT.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/ConcurrencyIT.java @@ -64,7 +64,7 @@ public void setUp() { long tuplesInChunk = 100; fragmentStorageService.keep( TupleChunkFragmentEntity.of( - testChunkId, testTupleType, 0, tuplesInChunk, ActivationStatus.UNLOCKED, null)); + testChunkId, testTupleType, 0, tuplesInChunk, ActivationStatus.UNLOCKED, null, true)); fragmentStorageService.activateFragmentsForTupleChunk(testChunkId); } diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplierTest.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplierTest.java index 42dc313..099aaba 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplierTest.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/CreateReservationSupplierTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -19,12 +19,11 @@ import io.carbynestack.castor.common.entities.ReservationElement; import io.carbynestack.castor.common.entities.TupleType; import io.carbynestack.castor.common.exceptions.CastorServiceException; +import io.carbynestack.castor.service.config.CastorServiceProperties; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentEntity; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentStorageService; import java.util.Optional; import java.util.UUID; -import org.hamcrest.CoreMatchers; -import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -37,6 +36,8 @@ class CreateReservationSupplierTest { @Mock private ReservationCachingService reservationCachingServiceMock; @Mock private TupleChunkFragmentStorageService tupleChunkFragmentStorageServiceMock; + @Mock private CastorServiceProperties castorServicePropertiesMock; + private final TupleType tupleType = INPUT_MASK_GFP; private final String reservationId = "80fbba1b-3da8-4b1e-8a2c-cebd65229fad" + tupleType; private final long count = 42; @@ -45,59 +46,43 @@ class CreateReservationSupplierTest { @BeforeEach public void setUp() { + lenient().doReturn(1000).when(castorServicePropertiesMock).getInitialFragmentSize(); createReservationSupplier = new CreateReservationSupplier( - castorInterVcpClientMock, - reservationCachingServiceMock, tupleChunkFragmentStorageServiceMock, reservationId, tupleType, + castorServicePropertiesMock, count); } @Test void givenInsufficientTuples_whenGet_thenThrowCastorServiceException() { - when(tupleChunkFragmentStorageServiceMock.getAvailableTuples(tupleType)).thenReturn(count - 1); - + // when(tupleChunkFragmentStorageServiceMock.getAvailableTuples(tupleType)).thenReturn(count - + // 1); + doReturn(Optional.empty()) + .when(tupleChunkFragmentStorageServiceMock) + .retrieveSinglePartialFragment(isA(TupleType.class), isA(boolean.class)); + doReturn(1000).when(castorServicePropertiesMock).getInitialFragmentSize(); CastorServiceException actualCse = assertThrows(CastorServiceException.class, () -> createReservationSupplier.get()); - assertEquals( - String.format(INSUFFICIENT_TUPLES_EXCEPTION_MSG, tupleType, count - 1, count), - actualCse.getMessage()); + assertEquals(FAILED_FETCH_AVAILABLE_FRAGMENT_EXCEPTION_MSG, actualCse.getMessage()); } @Test void givenProvidedChunksDoNotHaveEnoughTuplesAvailable_whenGet_thenThrowCastorServiceException() { - when(tupleChunkFragmentStorageServiceMock.getAvailableTuples(tupleType)).thenReturn(count); - when(tupleChunkFragmentStorageServiceMock.findAvailableFragmentWithTupleType(tupleType)) + doReturn(Optional.empty()) + .when(tupleChunkFragmentStorageServiceMock) + .retrieveSinglePartialFragment(tupleType, true); + doReturn(1000).when(castorServicePropertiesMock).getInitialFragmentSize(); + when(tupleChunkFragmentStorageServiceMock.retrieveSinglePartialFragment(tupleType, true)) .thenReturn(Optional.empty()); CastorServiceException actualCse = assertThrows(CastorServiceException.class, () -> createReservationSupplier.get()); - assertEquals(FAILED_RESERVE_TUPLES_EXCEPTION_MSG, actualCse.getMessage()); - MatcherAssert.assertThat( - actualCse.getCause(), CoreMatchers.instanceOf(CastorServiceException.class)); - assertEquals(FAILED_FETCH_AVAILABLE_FRAGMENT_EXCEPTION_MSG, actualCse.getCause().getMessage()); - } - - @Test - void givenSharingReservationFails_whenGet_thenThrowCastorServiceException() { - UUID chunkId = UUID.fromString("c8a0a467-16b0-4f03-b7d7-07cbe1b0e7e8"); - long startIndex = 0; - TupleChunkFragmentEntity fragmentEntity = - TupleChunkFragmentEntity.of(chunkId, tupleType, startIndex, count); - - when(tupleChunkFragmentStorageServiceMock.getAvailableTuples(tupleType)).thenReturn(count); - when(tupleChunkFragmentStorageServiceMock.findAvailableFragmentWithTupleType(tupleType)) - .thenReturn(Optional.of(fragmentEntity)); - - CastorServiceException actualCse = - assertThrows(CastorServiceException.class, () -> createReservationSupplier.get()); - - assertEquals(SHARING_RESERVATION_FAILED_EXCEPTION_MSG, actualCse.getMessage()); - verify(tupleChunkFragmentStorageServiceMock, times(1)).update(fragmentEntity); + assertEquals(FAILED_FETCH_AVAILABLE_FRAGMENT_EXCEPTION_MSG, actualCse.getMessage()); } @Test @@ -111,17 +96,16 @@ void givenSuccessfulRequest_whenGet_thenReturnExpectedReservation() { Reservation expectedReservation = new Reservation(reservationId, tupleType, singletonList(expectedReservationElement)); - when(tupleChunkFragmentStorageServiceMock.getAvailableTuples(tupleType)).thenReturn(count); - when(tupleChunkFragmentStorageServiceMock.findAvailableFragmentWithTupleType(tupleType)) - .thenReturn(Optional.of(fragmentEntity)); + doReturn(Optional.of(fragmentEntity)) + .when(tupleChunkFragmentStorageServiceMock) + .retrieveSinglePartialFragment(tupleType, true); when(tupleChunkFragmentStorageServiceMock.splitAt(fragmentEntity, count)) .thenReturn(fragmentEntity); - when(castorInterVcpClientMock.shareReservation(expectedReservation)).thenReturn(true); assertEquals(expectedReservation, createReservationSupplier.get()); verify(tupleChunkFragmentStorageServiceMock, times(1)).splitAt(fragmentEntity, count); verify(tupleChunkFragmentStorageServiceMock, times(1)).update(fragmentEntity); - verify(castorInterVcpClientMock, times(1)).shareReservation(expectedReservation); + // verify(castorInterVcpClientMock, times(1)).shareReservation(expectedReservation); } } diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceIT.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceIT.java index 57c88c9..762ea81 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceIT.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceIT.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -11,8 +11,7 @@ import static io.carbynestack.castor.common.entities.TupleType.MULTIPLICATION_TRIPLE_GFP; import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; import com.google.common.collect.Lists; @@ -23,6 +22,7 @@ import io.carbynestack.castor.common.exceptions.CastorServiceException; import io.carbynestack.castor.service.CastorServiceApplication; import io.carbynestack.castor.service.config.CastorCacheProperties; +import io.carbynestack.castor.service.config.CastorServiceProperties; import io.carbynestack.castor.service.config.CastorSlaveServiceProperties; import io.carbynestack.castor.service.download.DedicatedTransactionService; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentEntity; @@ -32,14 +32,14 @@ import io.carbynestack.castor.service.testconfig.ReusableMinioContainer; import io.carbynestack.castor.service.testconfig.ReusablePostgreSQLContainer; import io.carbynestack.castor.service.testconfig.ReusableRedisContainer; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.context.ApplicationContext; @@ -82,7 +82,7 @@ public class ReservationCachingServiceIT { @Autowired private TupleChunkFragmentStorageService tupleChunkFragmentStorageService; - @Autowired private TupleChunkFragmentRepository tupleChunkFragmentRepository; + @Autowired @SpyBean private TupleChunkFragmentRepository tupleChunkFragmentRepository; @Autowired private CastorSlaveServiceProperties castorSlaveServiceProperties; @@ -90,9 +90,12 @@ public class ReservationCachingServiceIT { @Autowired private ApplicationContext applicationContext; + @Autowired private CastorServiceProperties castorServiceProperties; + private ReservationCachingService reservationCachingService; private Cache reservationCache; private CastorInterVcpClient interVcpClientSpy; + private TupleChunkFragmentRepository fragmentRepositorySpy; private final UUID testRequestId = UUID.fromString("c8a0a467-16b0-4f03-b7d7-07cbe1b0e7e8"); private final UUID testChunkId = UUID.fromString("80fbba1b-3da8-4b1e-8a2c-cebd65229fad"); @@ -120,6 +123,7 @@ public void setUp() { redisTemplate, tupleChunkFragmentStorageService, castorSlaveServiceProperties, + castorServiceProperties, Optional.of(interVcpClientSpy), Optional.of(dedicatedTransactionService)); testEnvironment.clearAllData(); @@ -146,7 +150,7 @@ void givenReservationInCache_whenGetReservation_thenKeepReservationUntouchedInCa } @Test - void givenSharingReservationFails_whenCreateReservation_thenRollbackFragmentation() { + void givenSharingReservationFails_whenCreateReservation_thenKeepFragmentation() { TupleType requestedTupleType = MULTIPLICATION_TRIPLE_GFP; long requestedNoTuples = 12; UUID requestId = UUID.fromString("a345f933-bf70-4c7a-b6cd-312b55a6ff9c"); @@ -156,7 +160,7 @@ void givenSharingReservationFails_whenCreateReservation_thenRollbackFragmentatio long fragmentLength = 2 * requestedNoTuples; TupleChunkFragmentEntity existingFragment = TupleChunkFragmentEntity.of( - chunkId, requestedTupleType, fragmentStartIndex, fragmentLength, UNLOCKED, null); + chunkId, requestedTupleType, fragmentStartIndex, fragmentLength, UNLOCKED, null, false); String expectedReservationId = requestId + "_" + requestedTupleType; ReservationElement expectedReservationElement = new ReservationElement(chunkId, requestedNoTuples, fragmentStartIndex); @@ -166,6 +170,20 @@ void givenSharingReservationFails_whenCreateReservation_thenRollbackFragmentatio CastorServiceException expectedException = new CastorServiceException("sharing Reservation failed"); + TupleChunkFragmentEntity resultingReservedFragment = + TupleChunkFragmentEntity.of( + chunkId, + requestedTupleType, + fragmentStartIndex, + requestedNoTuples, + UNLOCKED, + resultingReservationId, + false); + + TupleChunkFragmentEntity resultingUnreservedFragment = + TupleChunkFragmentEntity.of( + chunkId, requestedTupleType, requestedNoTuples, fragmentLength, UNLOCKED, null, false); + tupleChunkFragmentRepository.save(existingFragment); doThrow(expectedException).when(interVcpClientSpy).shareReservation(expectedReservation); @@ -178,8 +196,29 @@ void givenSharingReservationFails_whenCreateReservation_thenRollbackFragmentatio resultingReservationId, testTupleType, requestedNoTuples)); assertEquals(expectedException, actualCSE); - - assertEquals(singletonList(existingFragment), tupleChunkFragmentRepository.findAll()); + AtomicLong firstId = new AtomicLong(); + tupleChunkFragmentRepository + .findAll() + .forEach( + x -> { + if (x != null + && x.getReservationId() != null + && resultingReservationId.equals(x.getReservationId())) { + if (x.getId() != firstId.get()) { + firstId.set(x.getId()); + assertEquals(resultingReservedFragment.getReservationId(), x.getReservationId()); + assertEquals(resultingReservedFragment.getStartIndex(), x.getStartIndex()); + assertEquals(resultingReservedFragment.getEndIndex(), x.getEndIndex()); + } else if (x.getId() == firstId.get() + 1) { + assertEquals( + resultingUnreservedFragment.getReservationId(), x.getReservationId()); + assertEquals(resultingUnreservedFragment.getStartIndex(), x.getStartIndex()); + assertEquals(resultingUnreservedFragment.getEndIndex(), x.getEndIndex()); + } else { + fail(); + } + } + }); } @Test @@ -193,6 +232,7 @@ void givenSuccessfulRequest_whenForgetReservation_thenRemoveFromCache() { } @Test + @SneakyThrows void whenReferencedSequenceIsSplitInFragments_whenApplyReservation_thenApplyAccordingly() { UUID tupleChunkId = UUID.fromString("3fd7eaf7-cda3-4384-8d86-2c43450cbe63"); long requestedStartIndex = 42; @@ -205,7 +245,7 @@ void whenReferencedSequenceIsSplitInFragments_whenApplyReservation_thenApplyAcco TupleChunkFragmentEntity fragmentBefore = TupleChunkFragmentEntity.of( - tupleChunkId, tupleType, 0, requestedStartIndex, UNLOCKED, null); + tupleChunkId, tupleType, 0, requestedStartIndex, UNLOCKED, null, false); TupleChunkFragmentEntity fragmentPart1 = TupleChunkFragmentEntity.of( tupleChunkId, @@ -213,7 +253,8 @@ void whenReferencedSequenceIsSplitInFragments_whenApplyReservation_thenApplyAcco requestedStartIndex, requestedStartIndex + requestedLength - 5, UNLOCKED, - null); + null, + false); TupleChunkFragmentEntity fragmentContainingRest = TupleChunkFragmentEntity.of( tupleChunkId, @@ -221,7 +262,8 @@ void whenReferencedSequenceIsSplitInFragments_whenApplyReservation_thenApplyAcco requestedStartIndex + requestedLength - 5, Long.MAX_VALUE, UNLOCKED, - null); + null, + false); fragmentBefore = fragmentRepository.save(fragmentBefore); fragmentPart1 = fragmentRepository.save(fragmentPart1); diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceTest.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceTest.java index 304c639..e44becd 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceTest.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/ReservationCachingServiceTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -9,6 +9,7 @@ import static io.carbynestack.castor.common.entities.TupleType.INPUT_MASK_GFP; import static io.carbynestack.castor.common.entities.TupleType.MULTIPLICATION_TRIPLE_GFP; +import static io.carbynestack.castor.service.persistence.cache.CreateReservationSupplier.SHARING_RESERVATION_FAILED_EXCEPTION_MSG; import static io.carbynestack.castor.service.persistence.cache.ReservationCachingService.*; import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -23,6 +24,7 @@ import io.carbynestack.castor.common.exceptions.CastorClientException; import io.carbynestack.castor.common.exceptions.CastorServiceException; import io.carbynestack.castor.service.config.CastorCacheProperties; +import io.carbynestack.castor.service.config.CastorServiceProperties; import io.carbynestack.castor.service.config.CastorSlaveServiceProperties; import io.carbynestack.castor.service.download.DedicatedTransactionService; import io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentEntity; @@ -30,10 +32,14 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.*; +import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.data.redis.cache.CacheKeyPrefix; import org.springframework.data.redis.core.RedisTemplate; @@ -59,6 +65,10 @@ class ReservationCachingServiceTest { @Mock private ExecutorService executorServiceMock; + @Mock private CastorServiceProperties castorServicePropertiesMock; + + @Spy private CastorInterVcpClient castorInterVcpClientSpy; + private final String testCacheName = "testCache"; private final String testCachePrefix = CacheKeyPrefix.simple().compute(testCacheName); @@ -67,6 +77,7 @@ class ReservationCachingServiceTest { @BeforeEach public void setUp() { when(castorCachePropertiesMock.getReservationStore()).thenReturn(testCacheName); + Mockito.lenient().doReturn(1000).when(castorServicePropertiesMock).getInitialFragmentSize(); if (reservationCachingService == null) { this.reservationCachingService = new ReservationCachingService( @@ -75,6 +86,7 @@ public void setUp() { redisTemplateMock, tupleChunkFragmentStorageServiceMock, castorSlaveServicePropertiesMock, + castorServicePropertiesMock, castorInterVcpClientOptionalMock, dedicatedTransactionServiceOptionalMock); this.reservationCachingService.executorService = executorServiceMock; @@ -159,7 +171,8 @@ void givenSuccessfulRequest_whenUpdateReservation_thenUpdateEntityInCache() { } @Test - void givenReservationWithIdInCache_whenGetUnlockedReservation_thenReturnExpectedReservation() { + void + givenReservationWithIdInCache_whenLockAndRetrieveReservation_thenReturnExpectedReservation() { UUID chunkId = UUID.fromString("b7b010e0-362b-401c-9560-4cf4b2a68139"); TupleType tupleType = MULTIPLICATION_TRIPLE_GFP; long tupleCount = 42; @@ -169,12 +182,15 @@ void givenReservationWithIdInCache_whenGetUnlockedReservation_thenReturnExpected new Reservation(reservationId, tupleType, singletonList(existingReservationElement)) .setStatus(ActivationStatus.UNLOCKED); + doReturn(1) + .when(tupleChunkFragmentStorageServiceMock) + .lockReservedFragmentsWithoutRetrieving(isA(UUID.class), anyLong(), isA(String.class)); when(redisTemplateMock.opsForValue()).thenReturn(valueOperationsMock); when(valueOperationsMock.get(testCachePrefix + reservationId)).thenReturn(existingReservation); assertEquals( existingReservation, - reservationCachingService.getUnlockedReservation(reservationId, tupleType, tupleCount)); + reservationCachingService.lockAndRetrieveReservation(reservationId, tupleType, tupleCount)); } @Test @@ -231,6 +247,7 @@ void givenNoFragmentWithReferencedStartIndex_whenKeepAndApplyReservation_thenThr } @Test + @SneakyThrows void givenReferencedSequenceLiesWithin_whenKeepAndApplyReservation_thenSplitFragmentAccordingly() { UUID tupleChunkId = UUID.fromString("3fd7eaf7-cda3-4384-8d86-2c43450cbe63"); @@ -250,7 +267,8 @@ void givenNoFragmentWithReferencedStartIndex_whenKeepAndApplyReservation_thenThr existingFragmentStartIndex, existingFragmentEndIndex, ActivationStatus.UNLOCKED, - null); + null, + true); when(redisTemplateMock.opsForValue()).thenReturn(valueOperationsMock); when(tupleChunkFragmentStorageServiceMock.splitAt( @@ -369,20 +387,24 @@ void givenSuccessfulRequest_whenGetReservationWithRetry_thenReturnExpectedReserv when(redisTemplateMock.opsForValue()).thenReturn(valueOperationsMock); when(valueOperationsMock.get(testCachePrefix + reservationId)) .thenReturn(expectedReservationMock); - when(expectedReservationMock.getStatus()).thenReturn(ActivationStatus.UNLOCKED); + // when(expectedReservationMock.getStatus()).thenReturn(ActivationStatus.UNLOCKED); when(expectedReservationMock.getTupleType()).thenReturn(tupleType); when(reservationElementMock.getReservedTuples()).thenReturn(count); + when(reservationElementMock.getTupleChunkId()).thenReturn(UUID.randomUUID()); when(expectedReservationMock.getReservations()) .thenReturn(singletonList(reservationElementMock)); + doReturn(1) + .when(tupleChunkFragmentStorageServiceMock) + .lockReservedFragmentsWithoutRetrieving(isA(UUID.class), anyLong(), isA(String.class)); assertEquals( expectedReservationMock, - reservationCachingService.getUnlockedReservation(reservationId, tupleType, count)); + reservationCachingService.lockAndRetrieveReservation(reservationId, tupleType, count)); } @Test void - givenCachedReservationForIdMismatchType_whenGetUnlockedReservation_thenThrowCastorServiceException() { + givenCachedReservationForIdMismatchType_whenLockAndRetrieveReservation_thenThrowCastorServiceException() { UUID chunkId = UUID.fromString("b7b010e0-362b-401c-9560-4cf4b2a68139"); String reservationId = "testReservationId"; TupleType tupleType = INPUT_MASK_GFP; @@ -396,12 +418,16 @@ reservationId, mismatchedTupleType, singletonList(existingReservationElement)) when(redisTemplateMock.opsForValue()).thenReturn(valueOperationsMock); when(valueOperationsMock.get(testCachePrefix + reservationId)).thenReturn(existingReservation); + doReturn(1) + .when(tupleChunkFragmentStorageServiceMock) + .lockReservedFragmentsWithoutRetrieving(isA(UUID.class), anyLong(), isA(String.class)); CastorServiceException actualCse = assertThrows( CastorServiceException.class, () -> - reservationCachingService.getUnlockedReservation(reservationId, tupleType, count)); + reservationCachingService.lockAndRetrieveReservation( + reservationId, tupleType, count)); assertEquals( String.format( @@ -415,7 +441,7 @@ reservationId, mismatchedTupleType, singletonList(existingReservationElement)) @Test void - givenCachedReservationForIdMismatchReservedCount_whenGetUnlockedReservation_thenThrowCastorServiceException() { + givenCachedReservationForIdMismatchReservedCount_whenLockAndRetrieveReservation_thenThrowCastorServiceException() { UUID chunkId = UUID.fromString("b7b010e0-362b-401c-9560-4cf4b2a68139"); String reservationId = "testReservationId"; TupleType tupleType = INPUT_MASK_GFP; @@ -427,12 +453,15 @@ reservationId, mismatchedTupleType, singletonList(existingReservationElement)) when(redisTemplateMock.opsForValue()).thenReturn(valueOperationsMock); when(valueOperationsMock.get(testCachePrefix + reservationId)).thenReturn(existingReservation); - + doReturn(1) + .when(tupleChunkFragmentStorageServiceMock) + .lockReservedFragmentsWithoutRetrieving(isA(UUID.class), anyLong(), isA(String.class)); CastorServiceException actualCse = assertThrows( CastorServiceException.class, () -> - reservationCachingService.getUnlockedReservation(reservationId, tupleType, count)); + reservationCachingService.lockAndRetrieveReservation( + reservationId, tupleType, count)); assertEquals( String.format( @@ -467,6 +496,7 @@ reservationId, mismatchedTupleType, singletonList(existingReservationElement)) assertEquals(expectedException, actualCse.getCause()); } + @Disabled // Disabled as unlocking is at least temporarily not implemented @Test void givenNoCachedReservationActivationThrows_whenCreateReservation_thenThrowCastorServiceException() { @@ -484,6 +514,8 @@ reservationId, mismatchedTupleType, singletonList(existingReservationElement)) .thenReturn(reservationMock); when(dedicatedTransactionServiceMock.runAsNewTransaction(any(UnlockReservationSupplier.class))) .thenThrow(expectedException); + when(castorInterVcpClientOptionalMock.get()).thenReturn(castorInterVcpClientSpy); + when(castorInterVcpClientSpy.shareReservation(any())).thenReturn(true); CastorServiceException actualCse = assertThrows( @@ -501,6 +533,7 @@ reservationId, mismatchedTupleType, singletonList(existingReservationElement)) DedicatedTransactionService dedicatedTransactionServiceMock = mock(DedicatedTransactionService.class); Reservation expectedReservationMock = mock(Reservation.class); + ReservationElement firstElementMock = mock(ReservationElement.class); when(redisTemplateMock.opsForValue()).thenReturn(valueOperationsMock); when(castorInterVcpClientOptionalMock.isPresent()).thenReturn(true); @@ -508,11 +541,52 @@ reservationId, mismatchedTupleType, singletonList(existingReservationElement)) when(dedicatedTransactionServiceOptionalMock.get()).thenReturn(dedicatedTransactionServiceMock); when(dedicatedTransactionServiceMock.runAsNewTransaction(any(CreateReservationSupplier.class))) .thenReturn(expectedReservationMock); - when(dedicatedTransactionServiceMock.runAsNewTransaction(any(UnlockReservationSupplier.class))) - .thenReturn(expectedReservationMock); + when(castorInterVcpClientOptionalMock.get()).thenReturn(castorInterVcpClientSpy); + when(castorInterVcpClientSpy.shareReservation(isA(Reservation.class))).thenReturn(true); + when(firstElementMock.getStartIndex()).thenReturn(0L); + when(firstElementMock.getReservedTuples()).thenReturn(42L); + when(expectedReservationMock.getReservations()).thenReturn(singletonList(firstElementMock)); + + doReturn(1) + .when(tupleChunkFragmentStorageServiceMock) + .lockReservedFragmentsWithoutRetrieving(any(), any(Long.class), any()); assertEquals( expectedReservationMock, reservationCachingService.createReservation(reservationId, INPUT_MASK_GFP, 42)); } + + @Test + void givenSharingReservationFails_whenGet_thenThrowCastorServiceException() { + UUID chunkId = UUID.fromString("c8a0a467-16b0-4f03-b7d7-07cbe1b0e7e8"); + long startIndex = 0; + TupleType tupleType = MULTIPLICATION_TRIPLE_GFP; + TupleChunkFragmentEntity fragmentEntity = + TupleChunkFragmentEntity.of(chunkId, tupleType, startIndex, 42); + DedicatedTransactionService dedicatedTransactionServiceSpy = + spy(DedicatedTransactionService.class); + Reservation reservationMock = mock(Reservation.class); + + // when(tupleChunkFragmentStorageServiceMock.getAvailableTuples(tupleType)).thenReturn(count); + when(castorInterVcpClientOptionalMock.isPresent()).thenReturn(true); + when(dedicatedTransactionServiceOptionalMock.isPresent()).thenReturn(true); + when(tupleChunkFragmentStorageServiceMock.retrieveSinglePartialFragment(tupleType, true)) + .thenReturn(Optional.of(fragmentEntity)); + // when(castorInterVcpClientSpy.shareReservation(any(Reservation.class))).thenReturn(false); + when(dedicatedTransactionServiceOptionalMock.get()).thenReturn(dedicatedTransactionServiceSpy); + // when(dedicatedTransactionServiceMock.runAsNewTransaction(isA(CreateReservationSupplier.class))) + // .thenReturn(reservationMock); + when(redisTemplateMock.opsForValue()).thenReturn(valueOperationsMock); + when(castorInterVcpClientOptionalMock.get()).thenReturn(castorInterVcpClientSpy); + when(castorInterVcpClientSpy.shareReservation(isA(Reservation.class))).thenReturn(false); + when(valueOperationsMock.get(any())).thenReturn(null); + + CastorServiceException actualCse = + assertThrows( + CastorServiceException.class, + () -> reservationCachingService.createReservation("testReservation", tupleType, 42)); + + assertEquals(SHARING_RESERVATION_FAILED_EXCEPTION_MSG, actualCse.getMessage()); + verify(tupleChunkFragmentStorageServiceMock, times(1)).update(fragmentEntity); + } } diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallableTest.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallableTest.java index 9c2411e..a831328 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallableTest.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/cache/WaitForReservationCallableTest.java @@ -38,11 +38,11 @@ void givenNoReservationRetrievedAndCancelled_whenCall_thenReturnNull() { WaitForReservationCallable wfrc = new WaitForReservationCallable( reservationId, tupleType, tupleCount, reservationCachingServiceMock, 0); - when(reservationCachingServiceMock.getUnlockedReservation(reservationId, tupleType, tupleCount)) + when(reservationCachingServiceMock.retrieveReservation(reservationId, tupleType, tupleCount)) .thenAnswer(RunWhenAccessedAnswer.of(retries, wfrc::cancel, null)); assertNull(wfrc.call()); verify(reservationCachingServiceMock, times(1)) - .getUnlockedReservation(reservationId, tupleType, tupleCount); + .retrieveReservation(reservationId, tupleType, tupleCount); } @Test @@ -51,12 +51,12 @@ void givenReservationLockedInitially_whenCall_thenWaitUntilUnlockedAndReturnRese WaitForReservationCallable wfrc = new WaitForReservationCallable( reservationId, tupleType, tupleCount, reservationCachingServiceMock, 0); - when(reservationCachingServiceMock.getUnlockedReservation(reservationId, tupleType, tupleCount)) + when(reservationCachingServiceMock.retrieveReservation(reservationId, tupleType, tupleCount)) .thenReturn(null) .thenReturn(expectedReservation); assertEquals(expectedReservation, wfrc.call()); verify(reservationCachingServiceMock, times(2)) - .getUnlockedReservation(reservationId, tupleType, tupleCount); + .retrieveReservation(reservationId, tupleType, tupleCount); } @RequiredArgsConstructor(staticName = "of") diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntityTest.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntityTest.java index 638d343..3036efa 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntityTest.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentEntityTest.java @@ -96,7 +96,7 @@ void givenValidConfiguration_whenCreateWithFactoryMethod_thenReturnExpectedFragm TupleChunkFragmentEntity fragment = TupleChunkFragmentEntity.of( - chunkId, tupleType, startIndex, endIndex, activationStatus, reservationId); + chunkId, tupleType, startIndex, endIndex, activationStatus, reservationId, true); assertEquals(chunkId, fragment.getTupleChunkId()); assertEquals(tupleType, fragment.getTupleType()); diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceIT.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceIT.java index 154eafa..d8ad6d6 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceIT.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceIT.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -9,21 +9,26 @@ import static io.carbynestack.castor.common.entities.ActivationStatus.LOCKED; import static io.carbynestack.castor.common.entities.ActivationStatus.UNLOCKED; +import static io.carbynestack.castor.common.entities.Field.GF2N; +import static io.carbynestack.castor.common.entities.Field.GFP; import static io.carbynestack.castor.service.persistence.fragmentstore.TupleChunkFragmentStorageService.CONFLICT_EXCEPTION_MSG; import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.*; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; +import io.carbynestack.castor.common.entities.ReservationElement; +import io.carbynestack.castor.common.entities.TupleChunk; import io.carbynestack.castor.common.entities.TupleType; import io.carbynestack.castor.common.exceptions.CastorClientException; import io.carbynestack.castor.service.CastorServiceApplication; +import io.carbynestack.castor.service.config.CastorServiceProperties; import io.carbynestack.castor.service.testconfig.PersistenceTestEnvironment; import io.carbynestack.castor.service.testconfig.ReusableMinioContainer; import io.carbynestack.castor.service.testconfig.ReusablePostgreSQLContainer; import io.carbynestack.castor.service.testconfig.ReusableRedisContainer; -import java.util.Arrays; -import java.util.Optional; -import java.util.UUID; +import java.util.*; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -54,12 +59,90 @@ public class TupleChunkFragmentStorageServiceIT { @Autowired private PersistenceTestEnvironment testEnvironment; @Autowired private TupleChunkFragmentRepository fragmentRepository; @Autowired private TupleChunkFragmentStorageService fragmentStorageService; + @Autowired private CastorServiceProperties castorServiceProperties; @BeforeEach public void setUp() { testEnvironment.clearAllData(); } + @Test + void testFragmentCache() { + UUID tupleChunkId = UUID.fromString("3fd7eaf7-cda3-4384-8d86-2c43450cbe63"); + UUID tupleChunkId2 = UUID.randomUUID(); + TupleType tupleType = TupleType.MULTIPLICATION_TRIPLE_GFP; + TupleType tupleType2 = TupleType.MULTIPLICATION_TRIPLE_GF2N; + byte[] expectedMGFPTupleData = + RandomUtils.nextBytes( // Value = elemsize * arity --> *2 for value + MAC + GFP.getElementSize() * TupleType.MULTIPLICATION_TRIPLE_GFP.getArity() * 50000 * 2); + + byte[] expectedMGF2nTupleData = + RandomUtils.nextBytes( // Value = elemsize * arity --> *2 for value + MAC + GF2N.getElementSize() * TupleType.MULTIPLICATION_TRIPLE_GF2N.getArity() * 50000 * 2); + + TupleChunk mGfpTupleChunk = + TupleChunk.of( + tupleType.getTupleCls(), tupleType.getField(), tupleChunkId, expectedMGFPTupleData); + TupleChunk mGf2nTupleChunk = + TupleChunk.of( + tupleType2.getTupleCls(), tupleType2.getField(), tupleChunkId2, expectedMGF2nTupleData); + + List fragmentEntities = generateFragmentsForChunk(mGfpTupleChunk); + + List fragmentEntities1 = generateFragmentsForChunk(mGf2nTupleChunk); + + // fragmentStorageService.addUniqueConstraint(); + + // fragmentStorageService.keep(fragmentEntities1.remove(0)); + // fragmentStorageService.keepRound(fragmentEntities1); + + // heartbeatRepository.insertIntoHeartbeatTable(new String(RandomUtils.nextBytes(16))); + + fragmentStorageService.keep(fragmentEntities); + // fragmentStorageService.keepRound(fragmentEntities); + + // fragmentStorageService.keep(generateFragmentsForChunk( + // TupleChunk.of(tupleType.getTupleCls(), tupleType.getField(), UUID.randomUUID(), + // RandomUtils.nextBytes( + // GFP.getElementSize() * TupleType.MULTIPLICATION_TRIPLE_GFP.getArity() * 50000 * + // 2)))); + + ArrayList roundEntities = + fragmentStorageService.retrieveAndReserveRoundFragments( + 50000, tupleType2, "3fd7eaf7-cda3-4384-8d86-2c43450cbe63"); + ArrayList reservationElements = mapToResElement(roundEntities); + System.out.println(reservationElements); + } + + protected ArrayList mapToResElement( + ArrayList frags) { + return frags.stream() + .map( + t -> + new ReservationElement( + t.getTupleChunkId(), + castorServiceProperties.getInitialFragmentSize(), + t.getStartIndex())) + .collect(Collectors.toCollection(ArrayList::new)); + } + + protected List generateFragmentsForChunk(TupleChunk tupleChunk) { + List fragments = new ArrayList<>(); + for (long i = 0; + i * castorServiceProperties.getInitialFragmentSize() < tupleChunk.getNumberOfTuples(); + i++) { + fragments.add( + TupleChunkFragmentEntity.of( + tupleChunk.getChunkId(), + tupleChunk.getTupleType(), + i * castorServiceProperties.getInitialFragmentSize(), + Math.min( + (i + 1) * castorServiceProperties.getInitialFragmentSize(), + tupleChunk.getNumberOfTuples()))); + } + return fragments; + } + @Test void givenNoFragmentWithChunkIdInDb_whenFindAvailableFragment_thenReturnEmptyOptional() { UUID tupleChunkId = UUID.fromString("3fd7eaf7-cda3-4384-8d86-2c43450cbe63"); @@ -104,7 +187,8 @@ void givenFragmentForChunkIdIsReserved_whenFindAvailableFragment_thenReturnEmpty actualStartIndex, actualLength, UNLOCKED, - actualReservationId)); + actualReservationId, + true)); assertEquals( Optional.empty(), @@ -129,7 +213,8 @@ void givenFragmentForChunkIdIsReserved_whenFindAvailableFragment_thenReturnEmpty actualStartIndex, actualEndIndex, UNLOCKED, - actualReservationId)); + actualReservationId, + true)); assertEquals( Optional.empty(), @@ -153,7 +238,8 @@ void givenFragmentMatchingCriteria_whenFindAvailableFragment_thenReturnExpectedF actualStartIndex, actualEndIndex, UNLOCKED, - actualReservationId); + actualReservationId, + true); fragmentRepository.save(expectedFragment); @@ -171,13 +257,19 @@ void givenMultipleFragmentsStored_whenFindAvailableFragment_thenReturnExpectedFr TupleChunkFragmentEntity fragmentBefore = TupleChunkFragmentEntity.of( - tupleChunkId, tupleType, 0, requestedStartIndex - 1, UNLOCKED, null); + tupleChunkId, tupleType, 0, requestedStartIndex - 1, UNLOCKED, null, true); TupleChunkFragmentEntity expectedFragment = TupleChunkFragmentEntity.of( - tupleChunkId, tupleType, requestedStartIndex, requestedStartIndex + 1, UNLOCKED, null); + tupleChunkId, + tupleType, + requestedStartIndex, + requestedStartIndex + 1, + UNLOCKED, + null, + true); TupleChunkFragmentEntity fragmentAfter = TupleChunkFragmentEntity.of( - tupleChunkId, tupleType, requestedStartIndex + 1, Long.MAX_VALUE, UNLOCKED, null); + tupleChunkId, tupleType, requestedStartIndex + 1, Long.MAX_VALUE, UNLOCKED, null, true); fragmentRepository.save(fragmentBefore); fragmentRepository.save(expectedFragment); @@ -227,7 +319,8 @@ void givenMultipleFragmentsStored_whenFindAvailableFragment_thenReturnExpectedFr 0, 42, UNLOCKED, - actualReservationId); + actualReservationId, + true); fragmentRepository.save(reservedFragment); assertEquals( @@ -246,7 +339,8 @@ void giveFragmentMatchesCriteria_whenFindAvailableFragmentForType_thenReturnFrag 0, 42, UNLOCKED, - null); + null, + true); fragmentRepository.save(expectedFragment); assertEquals( @@ -266,7 +360,8 @@ void giveFragmentMatchesCriteria_whenFindAvailableFragmentForType_thenReturnFrag 0, 42, UNLOCKED, - null); + null, + true); TupleChunkFragmentEntity additionalFragment1 = TupleChunkFragmentEntity.of( UUID.fromString("3dc08ff2-5eed-49a9-979e-3a3ac0e4a2cf"), @@ -274,7 +369,8 @@ void giveFragmentMatchesCriteria_whenFindAvailableFragmentForType_thenReturnFrag 0, 42, UNLOCKED, - null); + null, + true); TupleChunkFragmentEntity additionalFragment2 = TupleChunkFragmentEntity.of( UUID.fromString("80fbba1b-3da8-4b1e-8a2c-cebd65229fad"), @@ -282,7 +378,8 @@ void giveFragmentMatchesCriteria_whenFindAvailableFragmentForType_thenReturnFrag 0, 42, UNLOCKED, - null); + null, + true); fragmentRepository.save(additionalFragment1); fragmentRepository.save(expectedFragment); @@ -303,10 +400,10 @@ void givenNoConflictingFragments_whenCheckNoConflict_thenDoNothing() { TupleChunkFragmentEntity fragmentBefore = TupleChunkFragmentEntity.of( - tupleChunkId, tupleType, 0, requestedStartIndex, UNLOCKED, null); + tupleChunkId, tupleType, 0, requestedStartIndex, UNLOCKED, null, true); TupleChunkFragmentEntity fragmentAfter = TupleChunkFragmentEntity.of( - tupleChunkId, tupleType, requestedEndIndex, Long.MAX_VALUE, UNLOCKED, null); + tupleChunkId, tupleType, requestedEndIndex, Long.MAX_VALUE, UNLOCKED, null, true); fragmentRepository.save(fragmentBefore); fragmentRepository.save(fragmentAfter); @@ -331,7 +428,8 @@ void givenConflictingFragment_whenCheckNoConflict_thenThrowException() { requestedStartIndex + 1, requestedEndIndex - 1, LOCKED, - "alreadyReserved"); + "alreadyReserved", + true); fragmentRepository.save(conflictingFragment); CastorClientException actualCce = @@ -348,17 +446,18 @@ void giveMultipleFragmentsInDb_whenCountAvailableTuples_thenReturnExpectedCount( TupleType requestedType = TupleType.MULTIPLICATION_TRIPLE_GFP; TupleChunkFragmentEntity fragmentOfDifferentType = TupleChunkFragmentEntity.of( - UUID.randomUUID(), TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, UNLOCKED, null); + UUID.randomUUID(), TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, UNLOCKED, null, true); TupleChunkFragmentEntity lockedFragment = TupleChunkFragmentEntity.of( - UUID.randomUUID(), requestedType, 0, Long.MAX_VALUE, LOCKED, null); + UUID.randomUUID(), requestedType, 0, Long.MAX_VALUE, LOCKED, null, true); TupleChunkFragmentEntity reservedFragment = TupleChunkFragmentEntity.of( - UUID.randomUUID(), requestedType, 0, Long.MAX_VALUE, UNLOCKED, "alreadyReserved"); + UUID.randomUUID(), requestedType, 0, Long.MAX_VALUE, UNLOCKED, "alreadyReserved", true); TupleChunkFragmentEntity oneFragment = - TupleChunkFragmentEntity.of(UUID.randomUUID(), requestedType, 0, 12, UNLOCKED, null); + TupleChunkFragmentEntity.of(UUID.randomUUID(), requestedType, 0, 12, UNLOCKED, null, true); TupleChunkFragmentEntity anotherFragment = - TupleChunkFragmentEntity.of(UUID.randomUUID(), requestedType, 111, 141, UNLOCKED, null); + TupleChunkFragmentEntity.of( + UUID.randomUUID(), requestedType, 111, 141, UNLOCKED, null, true); fragmentRepository.save(fragmentOfDifferentType); fragmentRepository.save(lockedFragment); @@ -381,13 +480,13 @@ void giveMultipleFragmentsInDb_whenActivateForChunk_thenUpdateAccordingly() { TupleChunkFragmentEntity fragmentForDifferentChunk = TupleChunkFragmentEntity.of( - differentChunkId, TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, LOCKED, null); + differentChunkId, TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, LOCKED, null, true); TupleChunkFragmentEntity oneFragment = TupleChunkFragmentEntity.of( - requestedTupleChunkId, TupleType.MULTIPLICATION_TRIPLE_GFP, 0, 12, LOCKED, null); + requestedTupleChunkId, TupleType.MULTIPLICATION_TRIPLE_GFP, 0, 12, LOCKED, null, true); TupleChunkFragmentEntity anotherFragment = TupleChunkFragmentEntity.of( - requestedTupleChunkId, TupleType.SQUARE_TUPLE_GF2N, 111, 141, LOCKED, null); + requestedTupleChunkId, TupleType.SQUARE_TUPLE_GF2N, 111, 141, LOCKED, null, true); fragmentForDifferentChunk = fragmentRepository.save(fragmentForDifferentChunk); oneFragment = fragmentRepository.save(oneFragment); @@ -410,19 +509,19 @@ void giveMultipleFragmentsInDb_whenDeleteByReservationId_thenDeleteAccordingly() TupleChunkFragmentEntity unreservedFragment = TupleChunkFragmentEntity.of( - chunkId, TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, LOCKED, null); + chunkId, TupleType.INPUT_MASK_GFP, 150, Long.MAX_VALUE, LOCKED, null, true); TupleChunkFragmentEntity oneFragment = TupleChunkFragmentEntity.of( - chunkId, TupleType.MULTIPLICATION_TRIPLE_GFP, 0, 12, LOCKED, reservationId); + chunkId, TupleType.MULTIPLICATION_TRIPLE_GFP, 0, 12, LOCKED, reservationId, true); TupleChunkFragmentEntity anotherFragment = TupleChunkFragmentEntity.of( - chunkId, TupleType.SQUARE_TUPLE_GF2N, 111, 141, LOCKED, reservationId); + chunkId, TupleType.SQUARE_TUPLE_GF2N, 111, 141, LOCKED, reservationId, true); unreservedFragment = fragmentRepository.save(unreservedFragment); fragmentRepository.save(oneFragment); fragmentRepository.save(anotherFragment); - fragmentStorageService.deleteAllForReservationId(reservationId); + fragmentStorageService.lockReservedFragmentsWithoutRetrieving(chunkId, 0, reservationId); assertEquals(1, fragmentRepository.count()); assertEquals(singletonList(unreservedFragment), fragmentRepository.findAll()); @@ -434,7 +533,7 @@ void givenNoFragmentAssociatedWithChunkId_whenCheckReferenced_thenReturnFalse() UUID differentChunkId = UUID.fromString("80fbba1b-3da8-4b1e-8a2c-cebd65229fad"); TupleChunkFragmentEntity fragmentForDifferentChunk = TupleChunkFragmentEntity.of( - differentChunkId, TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, LOCKED, null); + differentChunkId, TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, LOCKED, null, true); fragmentRepository.save(fragmentForDifferentChunk); @@ -446,7 +545,13 @@ void givenAnyFragmentAssociatedWithChunkId_whenCheckReferenced_thenReturnTrue() UUID requestedTupleChunkId = UUID.fromString("3fd7eaf7-cda3-4384-8d86-2c43450cbe63"); TupleChunkFragmentEntity fragmentForDifferentChunk = TupleChunkFragmentEntity.of( - requestedTupleChunkId, TupleType.INPUT_MASK_GFP, 0, Long.MAX_VALUE, LOCKED, "reserved"); + requestedTupleChunkId, + TupleType.INPUT_MASK_GFP, + 0, + Long.MAX_VALUE, + LOCKED, + "reserved", + true); fragmentRepository.save(fragmentForDifferentChunk); diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceTest.java b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceTest.java index 68c972b..a414a88 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceTest.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/persistence/fragmentstore/TupleChunkFragmentStorageServiceTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 - for information on the respective copyright owner + * Copyright (c) 2024 - for information on the respective copyright owner * see the NOTICE file and/or the repository https://github.com/carbynestack/castor. * * SPDX-License-Identifier: Apache-2.0 @@ -16,6 +16,7 @@ import io.carbynestack.castor.common.entities.TupleType; import io.carbynestack.castor.common.exceptions.CastorClientException; import io.carbynestack.castor.common.exceptions.CastorServiceException; +import io.carbynestack.castor.service.config.CastorServiceProperties; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -33,6 +34,7 @@ class TupleChunkFragmentStorageServiceTest { @Mock private TupleChunkFragmentRepository tupleChunkFragmentRepositoryMock; @InjectMocks private TupleChunkFragmentStorageService tupleChunkFragmentStorageService; + @Mock private CastorServiceProperties castorServicePropertiesMock; @Test void givenNoConflictingFragments_whenKeep_thenPersist() { @@ -163,12 +165,15 @@ void givenSuccessfulRequest_whenSplitAt_thenSplitAccordinglyAndReturnAlteredFrag long startIndex = 0; long endIndex = 12; TupleChunkFragmentEntity fragmentEntity = - TupleChunkFragmentEntity.of(tupleChunkId, tupleType, startIndex, endIndex); + TupleChunkFragmentEntity.of( + tupleChunkId, tupleType, startIndex, endIndex, ActivationStatus.LOCKED, null, false); long splitIndex = 7; TupleChunkFragmentEntity expectedAlteredFragment = - TupleChunkFragmentEntity.of(tupleChunkId, tupleType, startIndex, splitIndex); + TupleChunkFragmentEntity.of( + tupleChunkId, tupleType, startIndex, splitIndex, ActivationStatus.LOCKED, null, false); TupleChunkFragmentEntity expectedNewFragment = - TupleChunkFragmentEntity.of(tupleChunkId, tupleType, splitIndex, endIndex); + TupleChunkFragmentEntity.of( + tupleChunkId, tupleType, splitIndex, endIndex, ActivationStatus.LOCKED, null, false); when(tupleChunkFragmentRepositoryMock.save(any())) .thenAnswer( @@ -190,7 +195,8 @@ void givenSplitIndexOutOfRange_whenSplitBefore_thenDoNothing() { long startIndex = 0; long endIndex = 12; TupleChunkFragmentEntity fragmentEntity = - TupleChunkFragmentEntity.of(tupleChunkId, tupleType, startIndex, endIndex); + TupleChunkFragmentEntity.of( + tupleChunkId, tupleType, startIndex, endIndex, ActivationStatus.LOCKED, null, false); long illegalSplitIndex = endIndex; assertEquals( @@ -207,12 +213,15 @@ void givenSuccessfulRequest_whenSplitBefore_thenSplitAccordinglyAndReturnNewFrag long startIndex = 0; long endIndex = 12; TupleChunkFragmentEntity fragmentEntity = - TupleChunkFragmentEntity.of(tupleChunkId, tupleType, startIndex, endIndex); + TupleChunkFragmentEntity.of( + tupleChunkId, tupleType, startIndex, endIndex, ActivationStatus.LOCKED, null, false); long splitIndex = 7; TupleChunkFragmentEntity expectedNewFragment = - TupleChunkFragmentEntity.of(tupleChunkId, tupleType, splitIndex, endIndex); + TupleChunkFragmentEntity.of( + tupleChunkId, tupleType, splitIndex, endIndex, ActivationStatus.LOCKED, null, false); TupleChunkFragmentEntity expectedAlteredFragment = - TupleChunkFragmentEntity.of(tupleChunkId, tupleType, startIndex, splitIndex); + TupleChunkFragmentEntity.of( + tupleChunkId, tupleType, startIndex, splitIndex, ActivationStatus.LOCKED, null, false); when(tupleChunkFragmentRepositoryMock.save(any())) .thenAnswer( @@ -269,9 +278,10 @@ void givenNoConflictingFragment_whenCheckNoConflict_thenDoNothing() { void givenRepositoryThrowsException_whenGetAvailableTuples_thenReturnZero() { TupleType tupleType = TupleType.MULTIPLICATION_TRIPLE_GFP; AopInvocationException expectedException = new AopInvocationException("expected"); - - when(tupleChunkFragmentRepositoryMock.getAvailableTupleByType(tupleType)) - .thenThrow(expectedException); + lenient().doReturn(1000).when(castorServicePropertiesMock).getInitialFragmentSize(); + doThrow(expectedException) + .when(tupleChunkFragmentRepositoryMock) + .getAvailableTuplesByType(tupleType); assertEquals(0, tupleChunkFragmentStorageService.getAvailableTuples(tupleType)); } @@ -280,9 +290,10 @@ void givenRepositoryThrowsException_whenGetAvailableTuples_thenReturnZero() { void givenSuccessfulRequest_whenGetAvailableTuples_thenReturnExpectedResult() { TupleType tupleType = TupleType.MULTIPLICATION_TRIPLE_GFP; long availableTuples = 42; - - when(tupleChunkFragmentRepositoryMock.getAvailableTupleByType(tupleType)) - .thenReturn(availableTuples); + lenient().doReturn(1000).when(castorServicePropertiesMock).getInitialFragmentSize(); + doReturn(availableTuples) + .when(tupleChunkFragmentRepositoryMock) + .getAvailableTuplesByType(tupleType); assertEquals(availableTuples, tupleChunkFragmentStorageService.getAvailableTuples(tupleType)); } @@ -318,15 +329,6 @@ void givenSuccessfulRequest_whenActivateFragmentsForTupleChunk_thenDoNothing() { } } - @Test - void givenSuccessfulRequest_whenDeleteAllForReservationId_thenDoNothing() { - String reservationId = "reservationId"; - - tupleChunkFragmentStorageService.deleteAllForReservationId(reservationId); - - verify(tupleChunkFragmentRepositoryMock, times(1)).deleteAllByReservationId(reservationId); - } - @Test void givenFragmentWithGivenTupleChunkIdExists_whenRequestIsReferencedState_thenReturnTrue() { UUID tupleChunkId = UUID.fromString("3fd7eaf7-cda3-4384-8d86-2c43450cbe63"); diff --git a/castor-service/src/test/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketServiceTest.java b/castor-service/src/test/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketServiceTest.java index d4923dd..e0ee949 100644 --- a/castor-service/src/test/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketServiceTest.java +++ b/castor-service/src/test/java/io/carbynestack/castor/service/websocket/DefaultCastorWebSocketServiceTest.java @@ -14,6 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.*; +import io.carbynestack.castor.common.entities.ActivationStatus; import io.carbynestack.castor.common.entities.MultiplicationTriple; import io.carbynestack.castor.common.entities.TupleChunk; import io.carbynestack.castor.common.entities.TupleType; @@ -132,7 +133,13 @@ void givenSuccessfulRequest_whenUploadChunk_thenSendSuccess() { .keep( Collections.singletonList( TupleChunkFragmentEntity.of( - chunkId, tupleType, 0, tupleChunk.getNumberOfTuples()))); + chunkId, + tupleType, + 0, + tupleChunk.getNumberOfTuples(), + ActivationStatus.LOCKED, + null, + false))); verify(messagingTemplateMock, times(1)) .convertAndSend( @@ -146,6 +153,7 @@ void givenChunkWithZeroTuples_whenGenerateFragments_thenReturnEmptyList() { TupleType tupleType = INPUT_MASK_GFP; TupleChunk emptyTupleChunk = TupleChunk.of(tupleType.getTupleCls(), tupleType.getField(), chunkId, new byte[0]); + // doReturn(1000).when(servicePropertiesMock).getInitialFragmentSize(); assertEquals( castorWebSocketService.generateFragmentsForChunk(emptyTupleChunk), Collections.emptyList()); @@ -163,10 +171,17 @@ void givenChunkWithMultipleTuples_whenGenerateFragments_thenGenerateFragmentsAcc List expectedFragments = Arrays.asList( TupleChunkFragmentEntity.of(chunkId, tupleType, 0, initialFragmentSize), - TupleChunkFragmentEntity.of(chunkId, tupleType, initialFragmentSize, numberOfTuples)); + TupleChunkFragmentEntity.of( + chunkId, + tupleType, + initialFragmentSize, + numberOfTuples, + ActivationStatus.LOCKED, + null, + false)); when(servicePropertiesMock.getInitialFragmentSize()).thenReturn(initialFragmentSize); - assertEquals(castorWebSocketService.generateFragmentsForChunk(tupleChunk), expectedFragments); + assertEquals(expectedFragments, castorWebSocketService.generateFragmentsForChunk(tupleChunk)); } } diff --git a/castor-service/src/test/resources/application-test.properties b/castor-service/src/test/resources/application-test.properties index 15663fc..6394df9 100644 --- a/castor-service/src/test/resources/application-test.properties +++ b/castor-service/src/test/resources/application-test.properties @@ -21,6 +21,7 @@ carbynestack.castor.serverHeartbeat=0 carbynestack.castor.clientHeartbeat=10000 carbynestack.castor.messageBuffer=10485760 carbynestack.castor.master=true +carbynestack.castor.podHash=${HOSTNAME:1ae21f5847-35dv3} carbynestack.castor.noSslValidation=false carbynestack.castor.trustedCertificates= carbynestack.castor.slaveUris=https://castor.carbynestack.io:8080 diff --git a/castor-upload-java-client/3RD-PARTY-LICENSES/sbom.xml b/castor-upload-java-client/3RD-PARTY-LICENSES/sbom.xml index 1b91188..aaa112d 100644 --- a/castor-upload-java-client/3RD-PARTY-LICENSES/sbom.xml +++ b/castor-upload-java-client/3RD-PARTY-LICENSES/sbom.xml @@ -132,7 +132,7 @@ castor-common io.carbynestack castor-common - 0.1-SNAPSHOT-4321261594-23-40a9faa + 0.1.1 Apache-2.0