Skip to content

Commit 27c8d94

Browse files
committed
Sync with WebSocket
1 parent 7df8369 commit 27c8d94

File tree

10 files changed

+288
-15
lines changed

10 files changed

+288
-15
lines changed

app/src/main/scala/org/alephium/explorer/ExplorerState.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import slick.jdbc.PostgresProfile
2525

2626
import org.alephium.explorer.cache.{BlockCache, MetricCache, TransactionCache}
2727
import org.alephium.explorer.config.{BootMode, ExplorerConfig}
28+
import org.alephium.explorer.config.ExplorerConfig.Consensus
2829
import org.alephium.explorer.persistence.Database
2930
import org.alephium.explorer.service._
3031
import org.alephium.explorer.util.Scheduler
@@ -46,6 +47,8 @@ sealed trait ExplorerState extends Service with StrictLogging {
4647
lazy val database: Database =
4748
new Database(config.bootMode)(executionContext, databaseConfig)
4849

50+
implicit lazy val consensus: Consensus = config.consensus
51+
4952
implicit lazy val blockCache: BlockCache =
5053
BlockCache(
5154
config.cacheRowCountReloadPeriod,

app/src/main/scala/org/alephium/explorer/SyncServices.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.alephium.api.model.{ChainParams, PeerAddress}
3232
import org.alephium.explorer.RichAVector._
3333
import org.alephium.explorer.cache.BlockCache
3434
import org.alephium.explorer.config.{BootMode, ExplorerConfig}
35+
import org.alephium.explorer.config.ExplorerConfig.Consensus
3536
import org.alephium.explorer.error.ExplorerError._
3637
import org.alephium.explorer.service._
3738
import org.alephium.explorer.util.Scheduler
@@ -46,6 +47,7 @@ object SyncServices extends StrictLogging {
4647
ec: ExecutionContext,
4748
dc: DatabaseConfig[PostgresProfile],
4849
blockFlowClient: BlockFlowClient,
50+
consensus: Consensus,
4951
blockCache: BlockCache,
5052
groupSetting: GroupSetting
5153
): Future[Unit] =
@@ -88,6 +90,7 @@ object SyncServices extends StrictLogging {
8890
dc: DatabaseConfig[PostgresProfile],
8991
blockFlowClient: BlockFlowClient,
9092
blockCache: BlockCache,
93+
consensus: Consensus,
9194
groupSetting: GroupSetting
9295
): Future[Unit] =
9396
Future.fromTry {

app/src/main/scala/org/alephium/explorer/error/ExplorerError.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ object ExplorerError {
8888
extends Exception(s"Cannot parse config file: $file", exception)
8989
with ConfigError
9090

91+
final case class WebSocketError(cause: Throwable)
92+
extends Exception(s"WebSocket error. $cause")
93+
with ExplorerError
94+
95+
/** ****** Group: [[ConfigError]] *******
96+
*/
9197
final case class InvalidGroupNumber(groupNum: Int)
9298
extends Exception(s"Invalid groupNum: $groupNum. It should be > 0")
9399
with ConfigError

app/src/main/scala/org/alephium/explorer/service/BlockFlowSyncService.scala

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.alephium.explorer.service
1919
import java.util.concurrent.atomic.AtomicBoolean
2020

2121
import scala.collection.immutable.ArraySeq
22-
import scala.concurrent.{ExecutionContext, Future}
22+
import scala.concurrent.{ExecutionContext, Future, Promise}
2323
import scala.concurrent.duration.{Duration => ScalaDuration, FiniteDuration}
2424
import scala.util.{Failure, Success}
2525

@@ -31,6 +31,7 @@ import sttp.model.Uri
3131
import org.alephium.explorer.{foldFutures, GroupSetting}
3232
import org.alephium.explorer.api.model.Height
3333
import org.alephium.explorer.cache.BlockCache
34+
import org.alephium.explorer.config.ExplorerConfig.Consensus
3435
import org.alephium.explorer.error.ExplorerError.BlocksInDifferentChains
3536
import org.alephium.explorer.persistence.DBRunner._
3637
import org.alephium.explorer.persistence.dao.BlockDao
@@ -53,6 +54,8 @@ import org.alephium.util.{Duration, TimeStamp}
5354
* 5. For each last block of each chains, mark it as part of the main chain and travel
5455
* down the parents recursively until we found back a parent that is part of the main chain.
5556
* 6. During step 5, if a parent is missing, we download it and continue the procces at 5.
57+
* 7. Once the blocks are up-to-date with the node, we switch to websocket syncing
58+
* 8. If the websocket close or is late, in case of network issue, we go back to step 1.
5659
*
5760
* TODO: Step 5 is costly, but it's an easy way to handle reorg. In step 3 we know we receive the current main chain
5861
* for that timerange, so in step 4 we could directly insert them as `mainChain = true`, but we need to sync
@@ -66,13 +69,15 @@ case object BlockFlowSyncService extends StrictLogging {
6669
private val defaultStep = Duration.ofMinutesUnsafe(30L)
6770
private val defaultBackStep = Duration.ofSecondsUnsafe(10L)
6871
private val initialBackStep = Duration.ofMinutesUnsafe(30L)
72+
private val upToDateDelta = Duration.ofSecondsUnsafe(30L)
6973
// scalastyle:on magic.number
7074

7175
def start(nodeUris: ArraySeq[Uri], interval: FiniteDuration)(implicit
7276
ec: ExecutionContext,
7377
dc: DatabaseConfig[PostgresProfile],
7478
blockFlowClient: BlockFlowClient,
7579
cache: BlockCache,
80+
consensus: Consensus,
7681
groupSetting: GroupSetting,
7782
scheduler: Scheduler
7883
): Future[Unit] =
@@ -87,14 +92,38 @@ case object BlockFlowSyncService extends StrictLogging {
8792
ec: ExecutionContext,
8893
dc: DatabaseConfig[PostgresProfile],
8994
blockFlowClient: BlockFlowClient,
95+
consensus: Consensus,
9096
cache: BlockCache,
9197
groupSetting: GroupSetting
9298
): Future[Unit] = {
93-
if (initialBackStepDone.get()) {
94-
syncOnceWith(nodeUris, defaultStep, defaultBackStep)
95-
} else {
96-
syncOnceWith(nodeUris, defaultStep, initialBackStep).map { _ =>
97-
initialBackStepDone.set(true)
99+
val syncResult =
100+
if (initialBackStepDone.get()) {
101+
syncOnceWith(nodeUris, defaultStep, defaultBackStep)
102+
} else {
103+
syncOnceWith(nodeUris, defaultStep, initialBackStep).map { result =>
104+
initialBackStepDone.set(true)
105+
result
106+
}
107+
}
108+
109+
syncResult.flatMap { isUpToDate =>
110+
if (isUpToDate) {
111+
logger.info("Blocks are up to date, switching to web socket syncing")
112+
val stopPromise = Promise[Unit]()
113+
// TODO Use config values
114+
// scalastyle:off magic.number
115+
WebSocketSyncService.sync(
116+
stopPromise,
117+
host = "127.0.0.1",
118+
port = 22973,
119+
flushInterval = Duration.ofMillisUnsafe(500)
120+
)
121+
// scalastyle:on magic.number
122+
stopPromise.future.map { _ =>
123+
logger.info("WebSocket syncing stopped, resuming http syncing")
124+
}
125+
} else {
126+
Future.successful(())
98127
}
99128
}
100129
}
@@ -106,7 +135,7 @@ case object BlockFlowSyncService extends StrictLogging {
106135
blockFlowClient: BlockFlowClient,
107136
cache: BlockCache,
108137
groupSetting: GroupSetting
109-
): Future[Unit] = {
138+
): Future[Boolean] = {
110139
getTimeStampRange(step, backStep)
111140
.flatMap { ranges =>
112141
Future.sequence {
@@ -116,12 +145,16 @@ case object BlockFlowSyncService extends StrictLogging {
116145
s"Syncing from ${TimeUtil.toInstant(from)} to ${TimeUtil
117146
.toInstant(to)} (${from.millis} - ${to.millis})"
118147
)
119-
syncTimeRange(from, to, uri)
148+
syncTimeRange(from, to, uri).map { _ =>
149+
(TimeStamp.now() -- to).map(_ < upToDateDelta).getOrElse(false)
150+
}
120151
}
121152
}
122153
}
123154
}
124-
.map(_ => ())
155+
.map { upToDates =>
156+
upToDates.flatten.contains(true)
157+
}
125158
}
126159
// scalastyle:on magic.number
127160

@@ -360,7 +393,7 @@ case object BlockFlowSyncService extends StrictLogging {
360393
}
361394
}
362395

363-
private def insertBlocks(blocksWithEvents: ArraySeq[BlockEntityWithEvents])(implicit
396+
def insertBlocks(blocksWithEvents: ArraySeq[BlockEntityWithEvents])(implicit
364397
ec: ExecutionContext,
365398
dc: DatabaseConfig[PostgresProfile],
366399
blockFlowClient: BlockFlowClient,

0 commit comments

Comments
 (0)