|
5 | 5 | */
|
6 | 6 | package org.hibernate.reactive.engine.jdbc.mutation.internal;
|
7 | 7 |
|
8 |
| -import java.lang.invoke.MethodHandles; |
9 |
| -import java.sql.SQLException; |
10 |
| -import java.util.concurrent.CompletionStage; |
11 |
| - |
12 | 8 | import org.hibernate.engine.jdbc.batch.spi.Batch;
|
13 | 9 | import org.hibernate.engine.jdbc.mutation.JdbcValueBindings;
|
14 | 10 | import org.hibernate.engine.jdbc.mutation.OperationResultChecker;
|
|
25 | 21 | import org.hibernate.persister.entity.mutation.EntityTableMapping;
|
26 | 22 | import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor;
|
27 | 23 | import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor;
|
| 24 | +import org.hibernate.reactive.engine.jdbc.ResultsCheckerUtil; |
28 | 25 | import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor;
|
29 | 26 | import org.hibernate.reactive.generator.values.ReactiveGeneratedValuesMutationDelegate;
|
30 | 27 | import org.hibernate.reactive.logging.impl.Log;
|
|
37 | 34 | import org.hibernate.sql.model.TableMapping;
|
38 | 35 | import org.hibernate.sql.model.ValuesAnalysis;
|
39 | 36 |
|
| 37 | +import java.lang.invoke.MethodHandles; |
| 38 | +import java.sql.SQLException; |
| 39 | +import java.util.ArrayList; |
| 40 | +import java.util.List; |
| 41 | +import java.util.concurrent.CompletionStage; |
| 42 | + |
40 | 43 | import static org.hibernate.engine.jdbc.mutation.internal.ModelMutationHelper.checkResults;
|
41 | 44 | import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
|
42 | 45 | import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
|
| 46 | +import static org.hibernate.reactive.util.impl.CompletionStages.loop; |
43 | 47 | import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture;
|
44 | 48 | import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
|
45 | 49 | import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER;
|
@@ -73,10 +77,64 @@ private ReactiveConnection connection(SharedSessionContractImplementor session)
|
73 | 77 | @Override
|
74 | 78 | public CompletionStage<Void> performReactiveBatchedOperations(
|
75 | 79 | ValuesAnalysis valuesAnalysis,
|
76 |
| - TableInclusionChecker inclusionChecker, OperationResultChecker resultChecker, |
| 80 | + TableInclusionChecker inclusionChecker, |
| 81 | + OperationResultChecker resultChecker, |
77 | 82 | SharedSessionContractImplementor session) {
|
78 |
| - return ReactiveMutationExecutor.super |
79 |
| - .performReactiveBatchedOperations( valuesAnalysis, inclusionChecker, resultChecker, session); |
| 83 | + final PreparedStatementGroup batchedMutationOperationGroup = getBatchedPreparedStatementGroup(); |
| 84 | + if ( batchedMutationOperationGroup != null ) { |
| 85 | + final List<PreparedStatementDetails> preparedStatementDetailsList = new ArrayList<>( |
| 86 | + batchedMutationOperationGroup.getNumberOfStatements() ); |
| 87 | + batchedMutationOperationGroup.forEachStatement( (tableName, statementDetails) -> preparedStatementDetailsList |
| 88 | + .add( statementDetails ) ); |
| 89 | + loop( preparedStatementDetailsList, statementDetails -> { |
| 90 | + if ( statementDetails == null ) { |
| 91 | + return voidFuture(); |
| 92 | + } |
| 93 | + final JdbcValueBindings valueBindings = getJdbcValueBindings(); |
| 94 | + final TableMapping tableDetails = statementDetails.getMutatingTableDetails(); |
| 95 | + if ( inclusionChecker != null && !inclusionChecker.include( tableDetails ) ) { |
| 96 | + if ( MODEL_MUTATION_LOGGER.isTraceEnabled() ) { |
| 97 | + MODEL_MUTATION_LOGGER.tracef( |
| 98 | + "Skipping execution of secondary insert : %s", |
| 99 | + tableDetails.getTableName() |
| 100 | + ); |
| 101 | + } |
| 102 | + return voidFuture(); |
| 103 | + } |
| 104 | + |
| 105 | + // If we get here the statement is needed - make sure it is resolved |
| 106 | + final Object[] paramValues = PreparedStatementAdaptor.bind( statement -> { |
| 107 | + PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( |
| 108 | + statementDetails, |
| 109 | + statement, |
| 110 | + session.getJdbcServices() |
| 111 | + ); |
| 112 | + valueBindings.beforeStatement( details ); |
| 113 | + } ); |
| 114 | + |
| 115 | + final ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); |
| 116 | + final String sql = statementDetails.getSqlString(); |
| 117 | + return reactiveConnection.update( |
| 118 | + sql, |
| 119 | + paramValues, |
| 120 | + true, |
| 121 | + (rowCount, batchPosition, query) -> ResultsCheckerUtil.checkResults( |
| 122 | + session, |
| 123 | + statementDetails, |
| 124 | + resultChecker, |
| 125 | + rowCount, |
| 126 | + batchPosition |
| 127 | + ) |
| 128 | + ).whenComplete( (o, throwable) -> { //TODO: is this part really needed? |
| 129 | + if ( statementDetails.getStatement() != null ) { |
| 130 | + statementDetails.releaseStatement( session ); |
| 131 | + } |
| 132 | + valueBindings.afterStatement( tableDetails ); |
| 133 | + } ); |
| 134 | + } |
| 135 | + ); |
| 136 | + } |
| 137 | + return voidFuture(); |
80 | 138 | }
|
81 | 139 |
|
82 | 140 | @Override
|
@@ -159,6 +217,22 @@ public CompletionStage<GeneratedValues> performReactiveNonBatchedOperations(
|
159 | 217 | }
|
160 | 218 | }
|
161 | 219 |
|
| 220 | + public CompletionStage<Void> performReactiveSelfExecutingOperations( |
| 221 | + ValuesAnalysis valuesAnalysis, |
| 222 | + TableInclusionChecker inclusionChecker, |
| 223 | + SharedSessionContractImplementor session) { |
| 224 | + if ( getSelfExecutingMutations() == null || getSelfExecutingMutations().isEmpty() ) { |
| 225 | + return voidFuture(); |
| 226 | + } |
| 227 | + |
| 228 | + return loop( getSelfExecutingMutations(), operation -> { |
| 229 | + if ( inclusionChecker.include( operation.getTableDetails() ) ) { |
| 230 | + operation.performMutation( getJdbcValueBindings(), valuesAnalysis, session ); |
| 231 | + } |
| 232 | + return voidFuture(); |
| 233 | + }); |
| 234 | + } |
| 235 | + |
162 | 236 | private class OperationsForEach {
|
163 | 237 |
|
164 | 238 | private final Object id;
|
@@ -210,6 +284,7 @@ public CompletionStage<Void> buildLoop() {
|
210 | 284 | return loop;
|
211 | 285 | }
|
212 | 286 | }
|
| 287 | + |
213 | 288 | @Override
|
214 | 289 | public CompletionStage<Void> performReactiveNonBatchedMutation(
|
215 | 290 | PreparedStatementDetails statementDetails,
|
|
0 commit comments