Skip to content

Commit d3181ca

Browse files
author
Yevhenii Nadtochii
authored
Merge pull request #1433 from SpineEventEngine/delivery-thread-id
[1.x] Make `AbstractWorkRegistry` respect the current worker of a node
2 parents aacba13 + 14e49be commit d3181ca

File tree

9 files changed

+171
-129
lines changed

9 files changed

+171
-129
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-6.9-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

license-report.md

Lines changed: 88 additions & 88 deletions
Large diffs are not rendered by default.

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ all modules and does not describe the project structure per-subproject.
1212

1313
<groupId>io.spine</groupId>
1414
<artifactId>spine-core-java</artifactId>
15-
<version>1.7.7-SNAPSHOT.7</version>
15+
<version>1.7.7-SNAPSHOT.8</version>
1616

1717
<inceptionYear>2015</inceptionYear>
1818

@@ -207,12 +207,12 @@ all modules and does not describe the project structure per-subproject.
207207
<dependency>
208208
<groupId>org.jacoco</groupId>
209209
<artifactId>org.jacoco.agent</artifactId>
210-
<version>0.8.5</version>
210+
<version>0.8.6</version>
211211
</dependency>
212212
<dependency>
213213
<groupId>org.jacoco</groupId>
214214
<artifactId>org.jacoco.ant</artifactId>
215-
<version>0.8.5</version>
215+
<version>0.8.6</version>
216216
</dependency>
217217
</dependencies>
218218
</project>

server/src/main/java/io/spine/server/delivery/AbstractWorkRegistry.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,50 +45,66 @@
4545
* persistence mechanism.
4646
*
4747
* @implNote This class is NOT thread safe. Synchronize the atomic persistence operations
48-
* as well as the methods implemented in this class make an implementation thread safe.
48+
* as well as the methods implemented in this class to make an implementation thread safe.
4949
*/
5050
@SPI
5151
public abstract class AbstractWorkRegistry implements ShardedWorkRegistry {
5252

5353
@Override
54-
public Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId) {
54+
public Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId node) {
5555
checkNotNull(index);
56-
checkNotNull(nodeId);
57-
58-
Optional<ShardSessionRecord> record = find(index);
59-
if (record.isPresent()) {
60-
ShardSessionRecord existingRecord = record.get();
61-
if (hasPickedBy(existingRecord)) {
62-
return Optional.empty();
63-
} else {
64-
ShardSessionRecord updatedRecord = updateNode(existingRecord, nodeId);
65-
return Optional.of(asSession(updatedRecord));
66-
}
67-
} else {
68-
ShardSessionRecord newRecord = createRecord(index, nodeId);
56+
checkNotNull(node);
57+
58+
WorkerId worker = currentWorkerFor(node);
59+
Optional<ShardProcessingSession> result = pickUp(index, worker);
60+
return result;
61+
}
62+
63+
private Optional<ShardProcessingSession> pickUp(ShardIndex index, WorkerId worker) {
64+
Optional<ShardSessionRecord> optionalRecord = find(index);
65+
if (!optionalRecord.isPresent()) {
66+
ShardSessionRecord newRecord = createRecord(index, worker);
6967
return Optional.of(asSession(newRecord));
7068
}
69+
70+
ShardSessionRecord record = optionalRecord.get();
71+
if (hasWorker(record)) {
72+
return Optional.empty();
73+
}
74+
75+
ShardSessionRecord updatedRecord = updateNode(record, worker);
76+
return Optional.of(asSession(updatedRecord));
7177
}
7278

73-
private static boolean hasPickedBy(ShardSessionRecord record) {
74-
return !NodeId.getDefaultInstance().equals(record.getPickedBy());
79+
/**
80+
* Returns an identifier of the current worker that is now going to process the shard.
81+
*
82+
* <p>An example of such an identifier could be ID of the thread which performs processing.
83+
*
84+
* @param node
85+
* the node to which the resulted worker belongs
86+
*/
87+
protected abstract WorkerId currentWorkerFor(NodeId node);
88+
89+
private static boolean hasWorker(ShardSessionRecord record) {
90+
return !WorkerId.getDefaultInstance().equals(record.getWorker());
7591
}
7692

77-
private ShardSessionRecord createRecord(ShardIndex index, NodeId nodeId) {
93+
private ShardSessionRecord createRecord(ShardIndex index, WorkerId worker) {
7894
ShardSessionRecord newRecord = ShardSessionRecord
7995
.newBuilder()
8096
.setIndex(index)
81-
.setPickedBy(nodeId)
97+
.setWorker(worker)
8298
.setWhenLastPicked(currentTime())
8399
.vBuild();
84100
write(newRecord);
85101
return newRecord;
86102
}
87103

88-
private ShardSessionRecord updateNode(ShardSessionRecord record, NodeId nodeId) {
104+
private ShardSessionRecord updateNode(ShardSessionRecord record, WorkerId worker) {
89105
ShardSessionRecord updatedRecord = record
90106
.toBuilder()
91-
.setPickedBy(nodeId)
107+
.setWorker(worker)
92108
.setWhenLastPicked(currentTime())
93109
.build();
94110
write(updatedRecord);
@@ -100,7 +116,7 @@ public Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
100116
checkNotNull(inactivityPeriod);
101117
ImmutableSet.Builder<ShardIndex> resultBuilder = ImmutableSet.builder();
102118
allRecords().forEachRemaining(record -> {
103-
if (record.hasPickedBy()) {
119+
if (record.hasWorker()) {
104120
Timestamp whenPicked = record.getWhenLastPicked();
105121
Duration elapsed = between(whenPicked, currentTime());
106122

@@ -115,11 +131,11 @@ public Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
115131
}
116132

117133
/**
118-
* Clears the value of {@code ShardSessionRecord.when_last_picked} and stores the session.
134+
* Clears the value of {@code ShardSessionRecord.worker} and stores the session.
119135
*/
120136
protected void clearNode(ShardSessionRecord session) {
121137
ShardSessionRecord record = session.toBuilder()
122-
.clearPickedBy()
138+
.clearWorker()
123139
.build();
124140
write(record);
125141
}

server/src/main/java/io/spine/server/delivery/ShardProcessingSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/**
3333
* The session of processing the messages, which reside in a shard.
3434
*
35-
* <p>Starts by {@linkplain ShardedWorkRegistry#pickUp(ShardIndex, NodeId)} picking up}
35+
* <p>Starts by {@linkplain ShardedWorkRegistry#pickUp(ShardIndex, NodeId) picking up}
3636
* the shard to process.
3737
*/
3838
@SPI

server/src/main/java/io/spine/server/delivery/ShardedWorkRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ public interface ShardedWorkRegistry {
5454
*
5555
* @param index
5656
* the index of the shard to pick up for processing
57-
* @param nodeId
57+
* @param node
5858
* the identifier of the node for which to pick the shard
5959
* @return the session of shard processing,
6060
* or {@code Optional.empty()} if the shard is not available
6161
*/
62-
Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId);
62+
Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId node);
6363

6464
/**
6565
* Clears up the recorded {@code NodeId}s from the session records if there was no activity

server/src/main/java/io/spine/server/delivery/memory/InMemoryShardedWorkRegistry.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.spine.server.delivery.ShardProcessingSession;
3434
import io.spine.server.delivery.ShardSessionRecord;
3535
import io.spine.server.delivery.ShardedWorkRegistry;
36+
import io.spine.server.delivery.WorkerId;
3637

3738
import java.util.Iterator;
3839
import java.util.Map;
@@ -56,6 +57,16 @@ public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, No
5657
return super.pickUp(index, nodeId);
5758
}
5859

60+
@Override
61+
protected WorkerId currentWorkerFor(NodeId node) {
62+
WorkerId worker = WorkerId
63+
.newBuilder()
64+
.setNodeId(node)
65+
.setValue(String.valueOf(Thread.currentThread().getId()))
66+
.vBuild();
67+
return worker;
68+
}
69+
5970
@Override
6071
public synchronized Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
6172
return super.releaseExpiredSessions(inactivityPeriod);

server/src/main/proto/spine/server/delivery/delivery.proto

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import "spine/server/server_environment.proto";
4444
//
4545
// A value type used across the application. To be potentially used in JavaScript, Go, C++ and
4646
// other client and server environments, that are split into shards.
47-
//
4847
message ShardIndex {
4948

5049
// The zero-based index of the shard.
@@ -60,17 +59,18 @@ message ShardSessionRecord {
6059
// The index of a shard processed in this session.
6160
ShardIndex index = 1 [(required) = true];
6261

63-
// The identifier of an application node, which picked up the index and processes it.
64-
//
65-
// Unset until a node picks the session.
66-
//
67-
NodeId picked_by = 2;
68-
6962
// When the shard processed within the session was last picked by the node.
7063
//
7164
// This field is unset if no nodes ever picked the session.
72-
//
7365
google.protobuf.Timestamp when_last_picked = 3;
66+
67+
// An identifier of an application worker, which picked up the shard and processes it.
68+
//
69+
// Unset until a worker picks the shard.
70+
WorkerId worker = 4;
71+
72+
reserved 2;
73+
reserved "picked_by";
7474
}
7575

7676
//A stage of the `Delivery` process running for some particular `ShardIndex`.
@@ -87,8 +87,23 @@ message DeliveryStage {
8787
//
8888
// Represented in each of the bounded contexts as an event reactor,
8989
// as it's impossible to have several process managers of the same state across bounded contexts.
90-
//
9190
message ShardMaintenance {
9291

9392
ShardIndex id = 1;
9493
}
94+
95+
// An identifier of a worker which processes a shard.
96+
//
97+
// This value is unique across the application. It is used to indicate who is currently processing
98+
// which shard. A single node can contain several workers (typically represented by threads)
99+
// processing different shards.
100+
message WorkerId {
101+
102+
// A node to which this worker belongs.
103+
NodeId nodeId = 1 [(required) = true];
104+
105+
// Worker's identifier.
106+
//
107+
// An example of such an identifier could be ID of the thread which processes a shard.
108+
string value = 2 [(required) = true];
109+
}

version.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
/**
3535
* Version of this library.
3636
*/
37-
val coreJava = "1.7.7-SNAPSHOT.7"
37+
val coreJava = "1.7.7-SNAPSHOT.8"
3838

3939
/**
4040
* Versions of the Spine libraries that `core-java` depends on.

0 commit comments

Comments
 (0)