Skip to content

Commit 59d5519

Browse files
authored
CORE-3968 Add new types for flow event retry and exception reporting (#342)
- Add new types for flow event retry and reporting exceptions to user code - Add flow wake up schedule to checkpoint
1 parent 362cdd4 commit 59d5519

File tree

7 files changed

+92
-1
lines changed

7 files changed

+92
-1
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"type": "record",
3+
"name": "WakeUpWithException",
4+
"namespace": "net.corda.data.flow.event",
5+
"doc": "The WakeUpWithException event is used to report an exception back to user code",
6+
"fields": [
7+
{
8+
"name": "error",
9+
"type": "net.corda.data.ExceptionEnvelope",
10+
"doc": "The error to be thrown back to the user code"
11+
}
12+
]
13+
}

data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStatus.avsc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"symbols": [
3434
"START_REQUESTED",
3535
"RUNNING",
36+
"RETRYING",
3637
"COMPLETED",
3738
"FAILED"
3839
]

data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/Checkpoint.avsc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@
3939
"items": "net.corda.data.flow.FlowStackItem"
4040
},
4141
"doc": "Used to track calls to sub-flows and their relationship to sessions."
42+
},
43+
{
44+
"name": "retryState",
45+
"type": ["null", "net.corda.data.flow.state.RetryState"],
46+
"default": null,
47+
"doc": "Optional retry information for a failed flow event. Setting this field marks the flow as retrying."
48+
},
49+
{
50+
"name": "maxFlowSleepDuration",
51+
"type": "int",
52+
"doc": "The maximum time a flow can sleep, before a Wakeup event is generated (milliseconds)"
4253
}
4354
]
4455
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"type": "record",
3+
"name": "RetryState",
4+
"namespace": "net.corda.data.flow.state",
5+
"doc": "The Retry State records the need to retry a failed event on the flow Checkpoint",
6+
"fields": [
7+
{
8+
"name": "retryCount",
9+
"type": "int",
10+
"doc": "The current retry count, set to 0 for the initial failure"
11+
},
12+
{
13+
"name": "failedEvent",
14+
"type": "net.corda.data.flow.event.FlowEvent",
15+
"doc": "Copy of the event that caused the failure"
16+
},
17+
{
18+
"name": "error",
19+
"type": "net.corda.data.ExceptionEnvelope",
20+
"doc": "The original error that caused the retry"
21+
},
22+
{
23+
"name": "firstFailureTimestamp",
24+
"type": {
25+
"type": "long",
26+
"logicalType": "timestamp-millis"
27+
},
28+
"doc": "The timestamp of when the first exception occurred that triggered a retry"
29+
},
30+
{
31+
"name": "lastFailureTimestamp",
32+
"type": {
33+
"type": "long",
34+
"logicalType": "timestamp-millis"
35+
},
36+
"doc": "The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure"
37+
}
38+
]
39+
}

data/config-schema/src/main/kotlin/net/corda/schema/configuration/FlowConfig.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,7 @@ object FlowConfig {
44
const val SESSION_MESSAGE_RESEND_WINDOW = "session.messageResendWindow"
55
const val SESSION_HEARTBEAT_TIMEOUT_WINDOW = "session.heartbeatTimeout"
66
const val SESSION_P2P_TTL = "session.p2pTTL"
7+
const val PROCESSING_MAX_RETRY_ATTEMPTS = "processing.maxRetryAttempts"
8+
const val PROCESSING_MAX_RETRY_DELAY = "processing.maxRetryDelay"
9+
const val PROCESSING_MAX_FLOW_SLEEP_DURATION = "processing.maxFlowSleepDuration"
710
}

data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,30 @@
55
"description": "Configuration schema for the flow subsection.",
66
"type": "object",
77
"properties": {
8+
"processing": {
9+
"description": "Settings for flow pipeline processing",
10+
"type": "object",
11+
"properties": {
12+
"maxRetryAttempts": {
13+
"description": "The maximum number of retry attempts a transient error will be retried before failing the flow. a value of zero disables retries.",
14+
"type": "integer",
15+
"minimum": 0,
16+
"default": 5
17+
},
18+
"maxRetryDelay": {
19+
"description": "The maximum delay before a retry is scheduled, in milliseconds",
20+
"type": "integer",
21+
"minimum": 1000,
22+
"default": 16000
23+
},
24+
"maxFlowSleepDuration": {
25+
"description": "The maximum delay before a periodic WakeUp is scheduled, in milliseconds",
26+
"type": "integer",
27+
"minimum": 1000,
28+
"default": 60000
29+
}
30+
}
31+
},
832
"session": {
933
"description": "Settings for flow sessions",
1034
"type": "object",

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ cordaProductVersion = 5.0.0
77
# NOTE: update this each time this module contains a breaking change
88
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
99
## a per module property in which case module versions can change independently.
10-
cordaApiRevision = 96
10+
cordaApiRevision = 97
1111

1212
# Main
1313
kotlinVersion = 1.6.21

0 commit comments

Comments
 (0)