@@ -46,8 +46,7 @@ object QueryView {
46
46
47
47
private case class LiveStreamFailed (cause : Throwable )
48
48
49
- /**
50
- * Additionally to being updated by the live stream the QueryView instantly issues a query using the recovery stream to perform a fast forced update
49
+ /** Additionally to being updated by the live stream the QueryView instantly issues a query using the recovery stream to perform a fast forced update
51
50
* (useful in corner cases when the live stream has a high delay/polling interval)
52
51
* While updating a subsequent ForceUpdate is ignored.
53
52
*/
@@ -74,8 +73,7 @@ object QueryView {
74
73
75
74
trait EventStreamOffsetTyped {
76
75
77
- /**
78
- * Type of offset used for position in the event stream
76
+ /** Type of offset used for position in the event stream
79
77
*/
80
78
type OT = Offset
81
79
}
@@ -110,109 +108,92 @@ abstract class QueryView
110
108
override private [persistence] val snapshotStore : ActorRef = persistence.snapshotStoreFor(snapshotPluginId)
111
109
private implicit val materializer : ActorMaterializer = ActorMaterializer ()(context)
112
110
113
- /**
114
- * This stash will contain the messages received during the recovery phase.
111
+ /** This stash will contain the messages received during the recovery phase.
115
112
*/
116
113
private val recoveringStash = createStash()
117
114
118
- /**
119
- * It is the persistenceId linked to this view. It should be unique.
115
+ /** It is the persistenceId linked to this view. It should be unique.
120
116
*/
121
117
override def snapshotterId : String
122
118
123
- /**
124
- * Configuration id of the snapshot plugin servicing this persistent actor or view.
119
+ /** Configuration id of the snapshot plugin servicing this persistent actor or view.
125
120
* When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path.
126
121
* When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry.
127
122
* Configuration entry must contain few required fields, such as `class`. See akka-persistence jar
128
123
* `src/main/resources/reference.conf`.
129
124
*/
130
125
def snapshotPluginId : String = " "
131
126
132
- /**
133
- * The amount of time this actor must wait until giving up waiting for the recovery process. A undefined duration
127
+ /** The amount of time this actor must wait until giving up waiting for the recovery process. A undefined duration
134
128
* causes the actor to wait indefinitely. If the recovery fails because of a timeout, this actor will crash.
135
129
*
136
130
* TODO Tune by a flag to indicate we want the actor to switch live if the recovery timeout.
137
131
*/
138
132
def recoveryTimeout : Duration = DefaultRecoveryTimeout
139
133
140
- /**
141
- * The amount of time this actor must wait until giving up waiting for a snapshot loading. A undefined duration
134
+ /** The amount of time this actor must wait until giving up waiting for a snapshot loading. A undefined duration
142
135
* causes the actor to wait indefinitely. The timeout does not cause this actor to crash, it is a recoverable error.
143
136
*/
144
137
def loadSnapshotTimeout : Duration = DefaultLoadSnapshotTimeout
145
138
146
- /**
147
- * It is the source od EventEnvelope used to recover the view status. It MUST be finite stream.
139
+ /** It is the source od EventEnvelope used to recover the view status. It MUST be finite stream.
148
140
*
149
141
* It is declared as AnyRef to be able to return [[akka.persistence.query.EventEnvelope ]].
150
142
*/
151
143
def recoveringStream (sequenceNrByPersistenceId : Map [String , Long ], lastOffset : OT ): Source [AnyRef , _]
152
144
153
- /**
154
- * It is the source od EventEnvelope used to receive live events, it MUST be a infinite stream (eg: It should never
145
+ /** It is the source od EventEnvelope used to receive live events, it MUST be a infinite stream (eg: It should never
155
146
* complete)
156
147
*
157
148
* It is declared as AnyRef to be able to return [[akka.persistence.query.EventEnvelope ]].
158
149
*/
159
150
def liveStream (sequenceNrByPersistenceId : Map [String , Long ], lastOffset : OT ): Source [AnyRef , _]
160
151
161
- /**
162
- * It is an hook called before the actor switch to live mode. It is synchronous (it can change the actor status).
152
+ /** It is an hook called before the actor switch to live mode. It is synchronous (it can change the actor status).
163
153
* It can be useful to fetch additional data from other actor/services before starting receiving messages.
164
154
*/
165
155
def preLive (): Unit = {}
166
156
167
- /**
168
- * @see [[akka.persistence.QueryView.ForceUpdate ]]
157
+ /** @see [[akka.persistence.QueryView.ForceUpdate ]]
169
158
*/
170
159
def forceUpdate (): Unit = startForceUpdate()
171
160
172
- /**
173
- * Is called when the stream of a forceUpdate has completed
161
+ /** Is called when the stream of a forceUpdate has completed
174
162
*/
175
163
def onForceUpdateCompleted () = {}
176
164
177
165
// Status accessors
178
166
179
- /**
180
- * Return if this actor is waiting for receiving the snapshot from the snapshot-store.
167
+ /** Return if this actor is waiting for receiving the snapshot from the snapshot-store.
181
168
*/
182
169
final def isWaitingForSnapshot : Boolean = currentState == State .WaitingForSnapshot
183
170
184
- /**
185
- * Return if this actor is in recovery phase. Useful to the implementor to apply different behavior when a message
171
+ /** Return if this actor is in recovery phase. Useful to the implementor to apply different behavior when a message
186
172
* came from the journal or from another actor.
187
173
*/
188
174
final def isRecovering : Boolean = currentState == State .Recovering
189
175
190
- /**
191
- * Return if this actor is in live phase. Useful to the implementor to apply different behavior when a message
176
+ /** Return if this actor is in live phase. Useful to the implementor to apply different behavior when a message
192
177
* came from the journal or from another actor.
193
178
*/
194
179
final def isLive : Boolean = currentState == State .Live
195
180
196
- /**
197
- * Return the last replayed message offset from the journal.
181
+ /** Return the last replayed message offset from the journal.
198
182
*/
199
183
final def lastOffset : OT = Option (_lastOffset).getOrElse(firstOffset)
200
184
201
- /**
202
- * The current sequenceNr of given persistenceId
185
+ /** The current sequenceNr of given persistenceId
203
186
*
204
187
* @param persistenceId
205
188
* @return
206
189
*/
207
190
final def lastSequenceNrFor (persistenceId : String ): Long = _sequenceNrByPersistenceId.getOrElse(persistenceId, 0 )
208
191
209
- /**
210
- * Return the number of processed events since last snapshot has been taken.
192
+ /** Return the number of processed events since last snapshot has been taken.
211
193
*/
212
194
final def noOfEventSinceLastSnapshot (): Long = _noOfEventsSinceLastSnapshot
213
195
214
- /**
215
- * Return the next sequence nr to apply to the next snapshot.
196
+ /** Return the next sequence nr to apply to the next snapshot.
216
197
*/
217
198
override final def snapshotSequenceNr : Long = lastSnapshotSequenceNr + 1
218
199
0 commit comments