Skip to content
This repository was archived by the owner on Apr 13, 2022. It is now read-only.

P2P telemetry #340

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ scorex {
# distinguish different networks (e.g. testnet/mainnet).
magicBytes = [12, 34, 56, 78]

# String with IP address and port to send as external address during handshake.
# String with IP address and port to send as external address during handshake.
# Could be set automatically if UPnP is enabled.
#
# If `declared-address` is set, which is the common scenario for nodes running in the cloud,
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/scorex/core/ModifiersCache.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package scorex.core

import scorex.core.consensus.{ContainsModifiers, HistoryReader}
import scorex.core.validation.RecoverableModifierError
import scorex.core.validation.{ModifierError, RecoverableModifierError}
import scorex.util.ScorexLogging

import scala.annotation.tailrec
Expand Down Expand Up @@ -117,23 +117,23 @@ class DefaultModifiersCache[PMOD <: PersistentNodeViewModifier, HR <: HistoryRea
* @param history - an interface to history which could be needed to define a candidate
* @return - candidate if it is found
*/
@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf"))
override def findCandidateKey(history: HR): Option[K] = {

cache.find { case (k, v) =>
history.applicableTry(v) match {
case Failure(e) if e.isInstanceOf[RecoverableModifierError] =>
case Failure(e: ModifierError) if !e.isFatal =>
// do nothing - modifier may be applied in future
log.info(s"Modifier ${v.encodedId} could not be applied now", e)
false
case Failure(e) =>
// non-recoverable error - remove modifier from cache
// TODO blaklist peer who sent it
log.warn(s"Modifier ${v.encodedId} became permanently invalid and will be removed from cache", e)
log.info(s"Modifier ${v.encodedId} became permanently invalid and will be removed from cache", e)
remove(k)
false
case Success(_) =>
true
}
}.map(_._1)
}

}
148 changes: 86 additions & 62 deletions src/main/scala/scorex/core/NodeViewHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package scorex.core
import akka.actor.Actor
import scorex.core.consensus.History.ProgressInfo
import scorex.core.consensus.{History, SyncInfo}
import scorex.core.diagnostics.DiagnosticsActor
import scorex.core.diagnostics.DiagnosticsActor.ReceivableMessages.InternalMessageTrip
import scorex.core.network.NodeViewSynchronizer.ReceivableMessages.NodeViewHolderEvent
import scorex.core.settings.ScorexSettings
import scorex.core.transaction._
import scorex.core.transaction.state.{MinimalState, TransactionValidation}
import scorex.core.transaction.wallet.Vault
import scorex.core.utils.ScorexEncoding
import scorex.util.ScorexLogging

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -263,81 +266,102 @@ trait NodeViewHolder[TX <: Transaction, PMOD <: PersistentNodeViewModifier]

//todo: update state in async way?
protected def pmodModify(pmod: PMOD): Unit =
if (!history().contains(pmod.id)) {
context.system.eventStream.publish(StartingPersistentModifierApplication(pmod))

log.info(s"Apply modifier ${pmod.encodedId} of type ${pmod.modifierTypeId} to nodeViewHolder")

history().append(pmod) match {
case Success((historyBeforeStUpdate, progressInfo)) =>
log.debug(s"Going to apply modifications to the state: $progressInfo")
context.system.eventStream.publish(SyntacticallySuccessfulModifier(pmod))
context.system.eventStream.publish(NewOpenSurface(historyBeforeStUpdate.openSurfaceIds()))

if (progressInfo.toApply.nonEmpty) {
val (newHistory, newStateTry, blocksApplied) =
updateState(historyBeforeStUpdate, minimalState(), progressInfo, IndexedSeq())

newStateTry match {
case Success(newMinState) =>
val newMemPool = updateMemPool(progressInfo.toRemove, blocksApplied, memoryPool(), newMinState)

//we consider that vault always able to perform a rollback needed
@SuppressWarnings(Array("org.wartremover.warts.OptionPartial"))
val newVault = if (progressInfo.chainSwitchingNeeded) {
vault().rollback(idToVersion(progressInfo.branchPoint.get)).get
} else vault()
blocksApplied.foreach(newVault.scanPersistent)

log.info(s"Persistent modifier ${pmod.encodedId} applied successfully")
updateNodeView(Some(newHistory), Some(newMinState), Some(newVault), Some(newMemPool))


case Failure(e) =>
log.warn(s"Can`t apply persistent modifier (id: ${pmod.encodedId}, contents: $pmod) to minimal state", e)
updateNodeView(updatedHistory = Some(newHistory))
context.system.eventStream.publish(SemanticallyFailedModification(pmod, e))
time("pmodModify") {
if (!history().contains(pmod.id)) {
context.system.eventStream.publish(StartingPersistentModifierApplication(pmod))

log.info(s"Apply modifier ${pmod.encodedId} of type ${pmod.modifierTypeId} to nodeViewHolder")

time("historyAppend")(history().append(pmod)) match {
case Success((historyBeforeStUpdate, progressInfo)) =>
log.debug(s"Going to apply modifications to the state: $progressInfo")
context.system.eventStream.publish(SyntacticallySuccessfulModifier(pmod, System.currentTimeMillis()))
context.system.eventStream.publish(NewOpenSurface(historyBeforeStUpdate.openSurfaceIds()))

if (progressInfo.toApply.nonEmpty) {
val (newHistory, newStateTry, blocksApplied) =
time("updateState")(updateState(historyBeforeStUpdate, minimalState(), progressInfo, IndexedSeq()))

newStateTry match {
case Success(newMinState) =>
val newMemPool = updateMemPool(progressInfo.toRemove, blocksApplied, memoryPool(), newMinState)

//we consider that vault always able to perform a rollback needed
@SuppressWarnings(Array("org.wartremover.warts.OptionPartial"))
val newVault = if (progressInfo.chainSwitchingNeeded) {
vault().rollback(idToVersion(progressInfo.branchPoint.get)).get
} else vault()
blocksApplied.foreach(newVault.scanPersistent)

log.info(s"Persistent modifier ${pmod.encodedId} applied successfully")
updateNodeView(Some(newHistory), Some(newMinState), Some(newVault), Some(newMemPool))


case Failure(e) =>
log.warn(s"Can`t apply persistent modifier (id: ${pmod.encodedId}, contents: $pmod) to minimal state", e)
updateNodeView(updatedHistory = Some(newHistory))
context.system.eventStream.publish(SemanticallyFailedModification(pmod, e))
}
} else {
requestDownloads(progressInfo)
updateNodeView(updatedHistory = Some(historyBeforeStUpdate))
}
} else {
requestDownloads(progressInfo)
updateNodeView(updatedHistory = Some(historyBeforeStUpdate))
}
case Failure(e) =>
log.warn(s"Can`t apply persistent modifier (id: ${pmod.encodedId}, contents: $pmod) to history", e)
context.system.eventStream.publish(SyntacticallyFailedModification(pmod, e))
case Failure(e) =>
log.warn(s"Can`t apply persistent modifier (id: ${pmod.encodedId}, contents: $pmod) to history", e)
context.system.eventStream.publish(SyntacticallyFailedModification(pmod, e))
}
} else {
log.warn(s"Trying to apply modifier ${pmod.encodedId} that's already in history")
}
} else {
log.warn(s"Trying to apply modifier ${pmod.encodedId} that's already in history")
}

private def time[R](tag: String)(block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
val et = (t1.toDouble - t0) / 1000000
context.actorSelection("../DiagnosticsActor") ! DiagnosticsActor.ReceivableMessages.MethodProfile(tag, et, System.currentTimeMillis())
result
}

/**
* Process new modifiers from remote.
* Put all candidates to modifiersCache and then try to apply as much modifiers from cache as possible.
* Clear cache if it's size exceeds size limit.
* Publish `ModifiersProcessingResult` message with all just applied and removed from cache modifiers.
*/
protected def processRemoteModifiers: Receive = {
case ModifiersFromRemote(mods: Seq[PMOD]) =>
mods.foreach(m => modifiersCache.put(m.id, m))

log.debug(s"Cache size before: ${modifiersCache.size}")

@tailrec
def applyLoop(applied: Seq[PMOD]): Seq[PMOD] = {
modifiersCache.popCandidate(history()) match {
case Some(mod) =>
pmodModify(mod)
applyLoop(mod +: applied)
case None =>
applied
case ModifiersFromRemote(mods: Seq[PMOD], id) =>
context.actorSelection("../DiagnosticsActor") !
InternalMessageTrip("nvh-received", id.toString, System.currentTimeMillis())
time("cacheApply") {
mods.foreach(m => modifiersCache.put(m.id, m))

val sizeBefore = modifiersCache.size
log.debug(s"Cache size before: ${modifiersCache.size}")

@tailrec
def applyLoop(applied: Seq[PMOD]): Seq[PMOD] = {
modifiersCache.popCandidate(history()) match {
case Some(mod) =>
pmodModify(mod)
applyLoop(mod +: applied)
case None =>
applied
}
}
}

val applied = applyLoop(Seq())
val cleared = modifiersCache.cleanOverfull()
val applied = applyLoop(Seq())
val cleared = modifiersCache.cleanOverfull()

context.system.eventStream.publish(ModifiersProcessingResult(applied, cleared))
log.debug(s"Cache size after: ${modifiersCache.size}")

context.system.eventStream.publish(ModifiersProcessingResult(applied, cleared))
log.debug(s"Cache size after: ${modifiersCache.size}")
val sizeAfter = modifiersCache.size

context.actorSelection("../DiagnosticsActor") !
DiagnosticsActor.ReceivableMessages.CacheState(sizeBefore, sizeAfter, cleared.map(_.id), System.currentTimeMillis())
}
}

protected def transactionsProcessing: Receive = {
Expand Down Expand Up @@ -388,7 +412,7 @@ object NodeViewHolder {
case class GetDataFromCurrentView[HIS, MS, VL, MP, A](f: CurrentView[HIS, MS, VL, MP] => A)

// Modifiers received from the remote peer with new elements in it
case class ModifiersFromRemote[PM <: PersistentNodeViewModifier](modifiers: Iterable[PM])
case class ModifiersFromRemote[PM <: PersistentNodeViewModifier](modifiers: Iterable[PM], id: Long = 0)

sealed trait NewTransactions[TX <: Transaction]{
val txs: Iterable[TX]
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/scorex/core/app/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route}
import akka.stream.ActorMaterializer
import scorex.core.api.http.{ApiErrorHandler, ApiRejectionHandler, ApiRoute, CompositeHttpService}
import scorex.core.diagnostics.DiagnosticsActorRef
import scorex.core.network._
import scorex.core.network.message._
import scorex.core.network.peer.PeerManagerRef
Expand Down Expand Up @@ -60,6 +61,8 @@ trait Application extends ScorexLogging {
)
}

val diagnosticsActorRef = DiagnosticsActorRef("DiagnosticsActor")

val nodeViewHolderRef: ActorRef
val nodeViewSynchronizer: ActorRef

Expand Down Expand Up @@ -87,7 +90,7 @@ trait Application extends ScorexLogging {
val peerManagerRef = PeerManagerRef(settings, scorexContext)

val networkControllerRef: ActorRef = NetworkControllerRef(
"networkController", settings.network, peerManagerRef, scorexContext)
"networkController", settings.network, peerManagerRef, diagnosticsActorRef, scorexContext)

lazy val combinedRoute: Route = CompositeHttpService(actorSystem, apiRoutes, settings.restApi, swaggerConfig).compositeRoute

Expand Down
103 changes: 103 additions & 0 deletions src/main/scala/scorex/core/diagnostics/DiagnosticsActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package scorex.core.diagnostics

import java.io.{File, PrintWriter}

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import scorex.core.consensus.SyncInfo
import scorex.core.network.NodeViewSynchronizer.ReceivableMessages.SyntacticallySuccessfulModifier
import scorex.core.network.SendingStrategy
import scorex.core.network.message.{InvData, Message, ModifiersData}
import scorex.util.{ModifierId, ScorexLogging}

class DiagnosticsActor extends Actor with ScorexLogging {

import DiagnosticsActor.ReceivableMessages._

private val outWriter = new PrintWriter(new File(s"/tmp/ergo/out-messages-${context.system.startTime}.json"))
private val inWriter = new PrintWriter(new File(s"/tmp/ergo/in-messages-${context.system.startTime}.json"))
private val smJournalWriter = new PrintWriter(new File(s"/tmp/ergo/sm-journal-${context.system.startTime}.json"))
private val mProfilesWriter = new PrintWriter(new File(s"/tmp/ergo/nvh-profile-${context.system.startTime}.json"))
private val cacheJournalWriter = new PrintWriter(new File(s"/tmp/ergo/cache-journal-${context.system.startTime}.json"))
private val internalMsgsWriter = new PrintWriter(new File(s"/tmp/ergo/ims-journal-${context.system.startTime}.json"))

override def preStart(): Unit = {
Seq(outWriter, inWriter, smJournalWriter, mProfilesWriter, cacheJournalWriter, internalMsgsWriter).foreach(_.write("["))
log.info("Starting diagnostics actor ..")
context.system.eventStream.subscribe(self, classOf[SyntacticallySuccessfulModifier[_]])
}

override def postStop(): Unit = {
Seq(outWriter, inWriter, smJournalWriter, mProfilesWriter, cacheJournalWriter, internalMsgsWriter).foreach { w =>
w.write("]")
w.close()
}
}

override def receive: Receive = {
case OutNetworkMessage(Message(spec, Right(data), _), strategy, receivers, timestamp) =>
receivers.foreach { r =>
val record =
s"""{"timestamp":$timestamp,"msgType":"${spec.messageName}","data":${decodeData(data)},"strategy":"$strategy","receiver":"$r"},\n"""
outWriter.write(record)
}

case InNetworkMessage(Message(spec, Right(data), _), sender, timestamp) =>
val record =
s"""{"timestamp":$timestamp,"msgType":"${spec.messageName}","data":${decodeData(data)},"sender":"$sender"},\n"""
inWriter.write(record)

case SyntacticallySuccessfulModifier(mod, ts) =>
val record = s"""{"typeId":"${mod.modifierTypeId}","id":"${mod.encodedId}","timestamp":$ts},\n"""
smJournalWriter.write(record)

case MethodProfile(tag, elapsedTime, ts) =>
val record = s"""{"tag":"$tag","elapsedTime":$elapsedTime,"timestamp":$ts},\n"""
mProfilesWriter.write(record)

case CacheState(sizeBefore, sizeAfter, cleared, ts) =>
val record = s"""{"sizeBefore":$sizeBefore,"sizeAfter":$sizeAfter,"cleared":[${cleared.map(id => s""""$id"""").mkString(",")}],"timestamp":$ts},\n"""
cacheJournalWriter.write(record)

case InternalMessageTrip(tag, msgId, ts) =>
val record = s"""{"tag":"$tag","msgId":"$msgId","timestamp":$ts},\n"""
internalMsgsWriter.write(record)

case other =>
log.info(s"DiagnosticsActor: unknown message: $other")
}

private def decodeData(data: Any) = data match {
case InvData(typeId, ids) =>
s"""{"typeId":"$typeId","ids":[${ids.map(id => s""""$id"""").mkString(",")}]}"""
case ModifiersData(typeId, mods) =>
s"""{"typeId":"$typeId","ids":[${mods.keys.map(id => s""""$id"""").mkString(",")}]}"""
case si: SyncInfo =>
val ids = si.startingPoints
s"""{"typeId":"101","ids":[${ids.map(id => s""""${id._2}"""").mkString(",")}]}"""
case other =>
s""""?$other""""
}

}

object DiagnosticsActor {

object ReceivableMessages {

case class OutNetworkMessage(msg: Message[_], strategy: SendingStrategy, receivers: Seq[String], timestamp: Long)

case class InNetworkMessage(msg: Message[_], sender: String, timestamp: Long)

case class MethodProfile(tag: String, elapsedTime: Double, timestamp: Long)

case class CacheState(sizeBefore: Int, sizeAfter: Int, cleared: Seq[ModifierId], timestamp: Long)

case class InternalMessageTrip(tag: String, msgId: String, timestamp: Long)

}

}

object DiagnosticsActorRef {
def apply(name: String)(implicit system: ActorSystem): ActorRef = system.actorOf(Props[DiagnosticsActor], name)
}
Loading