Skip to content
This repository was archived by the owner on Apr 13, 2022. It is now read-only.

WIP: Replace Array[Byte] by ByteString and Seq[Byte] #221

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/main/scala/scorex/ObjectGenerators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scorex

import java.net.{InetAddress, InetSocketAddress}

import akka.util.ByteString
import org.scalacheck.{Arbitrary, Gen}
import scorex.core.app.Version
import scorex.core.network.message.BasicMsgDataTypes._
Expand All @@ -18,8 +19,8 @@ trait ObjectGenerators {

lazy val smallInt: Gen[Int] = Gen.choose(0, 20)

lazy val nonEmptyBytesGen: Gen[Array[Byte]] = Gen.nonEmptyListOf(Arbitrary.arbitrary[Byte])
.map(_.toArray).suchThat(_.length > 0)
lazy val nonEmptyBytesGen: Gen[ByteString] = Gen.nonEmptyListOf(Arbitrary.arbitrary[Byte])
.map(ByteString(_:_*)).suchThat(_.nonEmpty)

def genBoundedBytes(minSize: Int, maxSize: Int): Gen[Array[Byte]] = {
Gen.choose(minSize, maxSize) flatMap { sz => Gen.listOfN(sz, Arbitrary.arbitrary[Byte]).map(_.toArray) }
Expand All @@ -33,7 +34,7 @@ trait ObjectGenerators {


lazy val modifierIdGen: Gen[ModifierId] = Gen.listOfN(NodeViewModifier.ModifierIdSize, Arbitrary.arbitrary[Byte])
.map(id => ModifierId @@ id.toArray)
.map(id => ModifierId @@ Seq(id:_*))

lazy val modifierTypeIdGen: Gen[ModifierTypeId] = Arbitrary.arbitrary[Byte].map(t => ModifierTypeId @@ t)

Expand All @@ -42,14 +43,14 @@ trait ObjectGenerators {
modifierIds: Seq[ModifierId] <- Gen.nonEmptyListOf(modifierIdGen) if modifierIds.nonEmpty
} yield modifierTypeId -> modifierIds

lazy val modifierWithIdGen: Gen[(ModifierId, Array[Byte])] = for {
lazy val modifierWithIdGen: Gen[(ModifierId, ByteString)] = for {
id <- modifierIdGen
mod <- nonEmptyBytesGen
} yield id -> mod

lazy val modifiersGen: Gen[ModifiersData] = for {
modifierTypeId: ModifierTypeId <- modifierTypeIdGen
modifiers: Map[ModifierId, Array[Byte]] <- Gen.nonEmptyMap(modifierWithIdGen).suchThat(_.nonEmpty)
modifiers: Map[ModifierId, ByteString] <- Gen.nonEmptyMap(modifierWithIdGen).suchThat(_.nonEmpty)
} yield modifierTypeId -> modifiers


Expand Down
19 changes: 7 additions & 12 deletions src/main/scala/scorex/core/NodeViewHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,11 @@ trait NodeViewHolder[P <: Proposition, TX <: Transaction[P], PMOD <: PersistentN
*/
val networkChunkSize: Int


protected type MapKey = scala.collection.mutable.WrappedArray.ofByte

protected def key(id: ModifierId): MapKey = new mutable.WrappedArray.ofByte(id)

/**
* Cache for modifiers. If modifiers are coming out-of-order, they are to be stored in this cache.
*/
//todo: make configurable limited size
private val modifiersCache = mutable.Map[MapKey, PMOD]()
private val modifiersCache = mutable.Map[ModifierId, PMOD]()

protected def txModify(tx: TX): Unit = {
//todo: async validation?
Expand Down Expand Up @@ -315,7 +310,7 @@ trait NodeViewHolder[P <: Proposition, TX <: Transaction[P], PMOD <: PersistentN
case typeId: ModifierTypeId if typeId == Transaction.ModifierTypeId =>
memoryPool().notIn(modifierIds)
case _ =>
modifierIds.filterNot(mid => history().contains(mid) || modifiersCache.contains(key(mid)))
modifierIds.filterNot(mid => history().contains(mid) || modifiersCache.contains(mid))
}

sender() ! RequestFromLocal(peer, modifierTypeId, ids)
Expand All @@ -329,14 +324,14 @@ trait NodeViewHolder[P <: Proposition, TX <: Transaction[P], PMOD <: PersistentN
txModify(tx)

case pmod: PMOD@unchecked =>
if (history().contains(pmod) || modifiersCache.contains(key(pmod.id))) {
if (history().contains(pmod) || modifiersCache.contains(pmod.id)) {
log.warn(s"Received modifier ${pmod.encodedId} that is already in history")
} else {
modifiersCache.put(key(pmod.id), pmod)
modifiersCache.put(pmod.id, pmod)
}
}

log.debug(s"Cache before(${modifiersCache.size}): ${modifiersCache.keySet.map(_.array).map(Base58.encode).mkString(",")}")
log.debug(s"Cache before(${modifiersCache.size}): ${modifiersCache.keySet.map(id => Base58.encode(id.toArray)).mkString(",")}")

var t: Option[PMOD] = None
do {
Expand All @@ -352,7 +347,7 @@ trait NodeViewHolder[P <: Proposition, TX <: Transaction[P], PMOD <: PersistentN
t.foreach(pmodModify)
} while (t.isDefined)

log.debug(s"Cache after(${modifiersCache.size}): ${modifiersCache.keySet.map(_.array).map(Base58.encode).mkString(",")}")
log.debug(s"Cache after(${modifiersCache.size}): ${modifiersCache.keySet.map(id => Base58.encode(id.toArray)).mkString(",")}")
}
}

Expand Down Expand Up @@ -398,7 +393,7 @@ object NodeViewHolder {

// Moved from NodeViewSynchronizer as this was only received here
case class CompareViews(source: ConnectedPeer, modifierTypeId: ModifierTypeId, modifierIds: Seq[ModifierId])
case class ModifiersFromRemote(source: ConnectedPeer, modifierTypeId: ModifierTypeId, remoteObjects: Seq[Array[Byte]])
case class ModifiersFromRemote(source: ConnectedPeer, modifierTypeId: ModifierTypeId, remoteObjects: Seq[Seq[Byte]])

case class LocallyGeneratedTransaction[P <: Proposition, TX <: Transaction[P]](tx: TX)
case class LocallyGeneratedModifier[PMOD <: PersistentNodeViewModifier](pmod: PMOD)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/core/NodeViewModifier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ sealed trait NodeViewModifier extends BytesSerializable {
//todo: check statically or dynamically output size
def id: ModifierId

def encodedId: String = Base58.encode(id)
def encodedId: String = Base58.encode(id.toArray)

override def equals(obj: scala.Any): Boolean = obj match {
case that: NodeViewModifier => (that.id sameElements id) && (that.modifierTypeId == modifierTypeId)
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/scorex/core/api/http/NodeViewApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.{ActorRef, ActorRefFactory}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.util.ByteString
import io.circe.syntax._
import scorex.core.NodeViewHolder.CurrentView
import scorex.core.consensus.History
Expand Down Expand Up @@ -61,7 +62,7 @@ case class NodeViewApiRoute[P <: Proposition, TX <: Transaction[P]]
Base58.decode(encodedId) match {
case Failure(e) => complete(ApiError.notExists)
case Success(rawId) =>
val id = ModifierId @@ rawId
val id: ModifierId = ModifierId @@ rawId.toSeq

def f(v: CurrentView[HIS, MS, VL, MP]): Option[PM] = v.history.modifierById(id)

Expand Down Expand Up @@ -95,7 +96,7 @@ case class NodeViewApiRoute[P <: Proposition, TX <: Transaction[P]]

def openSurface: Route = (get & path("openSurface")) {
withOpenSurface { os =>
complete(SuccessApiResponse(os.ids.map(Base58.encode).asJson))
complete(SuccessApiResponse(os.ids.map(id => Base58.encode(id.toArray)).asJson))
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/scorex/core/app/Version.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package scorex.core.app

import akka.util.ByteString
import scorex.core.serialization.{BytesSerializable, Serializer}

import scala.util.Try

object Version {
Expand All @@ -18,10 +20,10 @@ case class Version(firstDigit: Byte, secondDigit: Byte, thirdDigit: Byte) extend
object ApplicationVersionSerializer extends Serializer[Version] {
val SerializedVersionLength = 3

override def toBytes(obj: Version): Array[Byte] =
Array(obj.firstDigit, obj.secondDigit, obj.thirdDigit)
override def toBytes(obj: Version): Seq[Byte] =
ByteString(obj.firstDigit, obj.secondDigit, obj.thirdDigit)

override def parseBytes(bytes: Array[Byte]): Try[Version] = Try {
override def parseBytes(bytes: Seq[Byte]): Try[Version] = Try {
Version(
bytes(0),
bytes(1),
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/core/consensus/History.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object History {
lazy val chainSwitchingNeeded: Boolean = toRemove.nonEmpty

override def toString: String = {
s"ProgressInfo(BranchPoint: ${branchPoint.map(Base58.encode)}, " +
s"ProgressInfo(BranchPoint: ${branchPoint.map(id => Base58.encode(id.toArray))}, " +
s" to remove: ${toRemove.map(_.encodedId)}, to apply: ${toApply.map(_.encodedId)})"
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/scorex/core/core.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ package object core {
//TODO implement ModifierTypeId as a trait
object ModifierTypeId extends TaggedType[Byte]

object ModifierId extends TaggedType[Array[Byte]]
object ModifierId extends TaggedType[Seq[Byte]]

object VersionTag extends TaggedType[Array[Byte]]
object VersionTag extends TaggedType[Seq[Byte]]

type ModifierTypeId = ModifierTypeId.Type

Expand All @@ -20,8 +20,8 @@ package object core {
type VersionTag = VersionTag.Type

def idsToString(ids: Seq[(ModifierTypeId, ModifierId)]): String = (ids.headOption, ids.lastOption) match {
case (Some(f), Some(l)) if f._2 sameElements l._2 => s"[(${f._1},${Base58.encode(f._2)})]"
case (Some(f), Some(l)) => s"[(${f._1},${Base58.encode(f._2)})..(${l._1},${Base58.encode(l._2)})]"
case (Some(f), Some(l)) if f._2 == l._2 => s"[(${f._1},${Base58.encode(f._2.toArray)})]"
case (Some(f), Some(l)) => s"[(${f._1},${Base58.encode(f._2.toArray)})..(${l._1},${Base58.encode(l._2.toArray)})]"
case _ => "[]"
}

Expand Down
65 changes: 28 additions & 37 deletions src/main/scala/scorex/core/network/DeliveryTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,76 +20,67 @@ class DeliveryTracker(context: ActorContext,
maxDeliveryChecks: Int,
nvsRef: ActorRef) extends ScorexLogging {

protected type ModifierIdAsKey = scala.collection.mutable.WrappedArray.ofByte

protected def key(id: ModifierId): ModifierIdAsKey = new mutable.WrappedArray.ofByte(id)

// todo: Do we need to keep track of ModifierTypeIds? Maybe we could ignore them?

// when a remote peer is asked a modifier, we add the expected data to `expecting`
// when a remote peer delivers expected data, it is removed from `expecting` and added to `delivered`.
// when a remote peer delivers unexpected data, it is added to `deliveredSpam`.
protected val expecting = mutable.Set[(ModifierTypeId, ModifierIdAsKey, ConnectedPeer)]()
protected val delivered = mutable.Map[ModifierIdAsKey, ConnectedPeer]()
protected val deliveredSpam = mutable.Map[ModifierIdAsKey, ConnectedPeer]()
protected val expecting = mutable.Set[(ModifierTypeId, ModifierId, ConnectedPeer)]()
protected val delivered = mutable.Map[ModifierId, ConnectedPeer]()
protected val deliveredSpam = mutable.Map[ModifierId, ConnectedPeer]()

protected val cancellables = mutable.Map[(ModifierIdAsKey, ConnectedPeer), Cancellable]()
protected val checksCounter = mutable.Map[(ModifierIdAsKey, ConnectedPeer), Int]()
protected val cancellables = mutable.Map[(ModifierId, ConnectedPeer), Cancellable]()
protected val checksCounter = mutable.Map[(ModifierId, ConnectedPeer), Int]()

def expect(cp: ConnectedPeer, mtid: ModifierTypeId, mids: Seq[ModifierId])(implicit ec: ExecutionContext): Unit = tryWithLogging {
for (mid <- mids) expect(cp, mtid, mid)
}

protected def expect(cp: ConnectedPeer, mtid: ModifierTypeId, mid: ModifierId)(implicit ec: ExecutionContext): Unit = {
val cancellable = context.system.scheduler.scheduleOnce(deliveryTimeout,
nvsRef,
CheckDelivery(cp, mtid, mid))
val midAsKey = key(mid)
expecting += ((mtid, midAsKey, cp))
cancellables((midAsKey, cp)) = cancellable
for (mid <- mids) {
val cancellable = context.system.scheduler.scheduleOnce(deliveryTimeout,
nvsRef,
CheckDelivery(cp, mtid, mid))
expecting += ((mtid, mid, cp))
cancellables((mid, cp)) = cancellable
}
}

// stops expecting, and expects again if the number of checks does not exceed the maximum
def reexpect(cp: ConnectedPeer, mtid: ModifierTypeId, mid: ModifierId)(implicit ec: ExecutionContext): Unit = tryWithLogging {
stopExpecting(cp, mtid, mid)
val midAsKey = key(mid)
val checks = checksCounter.getOrElseUpdate((midAsKey, cp), 0) + 1
checksCounter((midAsKey, cp)) = checks
if (checks < maxDeliveryChecks) expect(cp, mtid, mid)
else checksCounter -= ((midAsKey, cp))
val checks = checksCounter.getOrElseUpdate((mid, cp), 0) + 1
checksCounter((mid, cp)) = checks
if (checks < maxDeliveryChecks) expect(cp, mtid, Seq(mid))
else checksCounter -= ((mid, cp))
}

protected def stopExpecting(cp: ConnectedPeer, mtid: ModifierTypeId, mid: ModifierId): Unit = {
val midAsKey = key(mid)
expecting -= ((mtid, midAsKey, cp))
cancellables((midAsKey, cp)).cancel()
cancellables -= ((midAsKey, cp))
expecting -= ((mtid, mid, cp))
cancellables((mid, cp)).cancel()
cancellables -= ((mid, cp))
}

protected def isExpecting(mtid: ModifierTypeId, mid: ModifierId, cp: ConnectedPeer): Boolean =
expecting.exists(e => (mtid == e._1) && (mid sameElements e._2.array) && cp == e._3)
expecting.exists(e => (mtid == e._1) && (mid == e._2) && cp == e._3)

def receive(mtid: ModifierTypeId, mid: ModifierId, cp: ConnectedPeer): Unit = tryWithLogging {
if (isExpecting(mtid, mid, cp)) {
val eo = expecting.find(e => (mtid == e._1) && (mid sameElements e._2) && cp == e._3)
val eo = expecting.find(e => (mtid == e._1) && (mid == e._2) && cp == e._3)
for (e <- eo) expecting -= e
delivered(key(mid)) = cp
val cancellableKey = (key(mid), cp)
delivered(mid) = cp
val cancellableKey = (mid, cp)
for (c <- cancellables.get(cancellableKey)) c.cancel()
cancellables -= cancellableKey
}
else {
deliveredSpam(key(mid)) = cp
deliveredSpam(mid) = cp
}
}

def delete(mid: ModifierId): Unit = tryWithLogging(delivered -= key(mid))
def delete(mid: ModifierId): Unit = tryWithLogging(delivered -= mid)

def deleteSpam(mids: Seq[ModifierId]): Unit = for (id <- mids) tryWithLogging(deliveredSpam -= key(id))
def deleteSpam(mids: Seq[ModifierId]): Unit = for (id <- mids) tryWithLogging(deliveredSpam -= id)

def isSpam(mid: ModifierId): Boolean = deliveredSpam contains key(mid)
def isSpam(mid: ModifierId): Boolean = deliveredSpam contains mid

def peerWhoDelivered(mid: ModifierId): Option[ConnectedPeer] = delivered.get(key(mid))
def peerWhoDelivered(mid: ModifierId): Option[ConnectedPeer] = delivered.get(mid)

protected def tryWithLogging(fn: => Unit): Unit = {
Try(fn).recoverWith {
Expand Down
22 changes: 12 additions & 10 deletions src/main/scala/scorex/core/network/Handshake.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package scorex.core.network

import java.net.{InetAddress, InetSocketAddress}

import akka.util.ByteString
import com.google.common.primitives.{Ints, Longs}
import scorex.core.app.{Version, ApplicationVersionSerializer}
import scorex.core.app.{ApplicationVersionSerializer, Version}
import scorex.core.serialization.{BytesSerializable, Serializer}

import scala.util.Try
Expand All @@ -25,7 +26,7 @@ case class Handshake(applicationName: String,

object HandshakeSerializer extends Serializer[Handshake] {

override def toBytes(obj: Handshake): Array[Byte] = {
override def toBytes(obj: Handshake): Seq[Byte] = {
val anb = obj.applicationName.getBytes

val fab = obj.declaredAddress.map { isa =>
Expand All @@ -34,22 +35,23 @@ object HandshakeSerializer extends Serializer[Handshake] {

val nodeNameBytes = obj.nodeName.getBytes

Array(anb.size.toByte) ++ anb ++
ByteString(anb.size.toByte) ++ anb ++
obj.protocolVersion.bytes ++
Array(nodeNameBytes.size.toByte) ++ nodeNameBytes ++
Ints.toByteArray(fab.length) ++ fab ++
Longs.toByteArray(obj.time)

}

override def parseBytes(bytes: Array[Byte]): Try[Handshake] = Try {
@SuppressWarnings(Array("org.wartremover.warts.TraversableOps"))
override def parseBytes(bytes: Seq[Byte]): Try[Handshake] = Try {
var position = 0
val appNameSize = bytes.head
require(appNameSize > 0)

position += 1

val an = new String(bytes.slice(position, position + appNameSize))
val an = new String(bytes.slice(position, position + appNameSize).toArray) // fixme: use `toString`?
position += appNameSize

val av = ApplicationVersionSerializer.parseBytes(
Expand All @@ -59,23 +61,23 @@ object HandshakeSerializer extends Serializer[Handshake] {
val nodeNameSize = bytes.slice(position, position + 1).head
position += 1

val nodeName = new String(bytes.slice(position, position + nodeNameSize))
val nodeName = new String(bytes.slice(position, position + nodeNameSize).toArray) // fixme
position += nodeNameSize

val fas = Ints.fromByteArray(bytes.slice(position, position + 4))
val fas = Ints.fromByteArray(bytes.slice(position, position + 4).toArray)
position += 4

val isaOpt = if (fas > 0) {
val fa = bytes.slice(position, position + fas - 4)
position += fas - 4

val port = Ints.fromByteArray(bytes.slice(position, position + 4))
val port = Ints.fromByteArray(bytes.slice(position, position + 4).toArray)
position += 4

Some(new InetSocketAddress(InetAddress.getByAddress(fa), port))
Some(new InetSocketAddress(InetAddress.getByAddress(fa.toArray), port))
} else None

val time = Longs.fromByteArray(bytes.slice(position, position + 8))
val time = Longs.fromByteArray(bytes.slice(position, position + 8).toArray)

Handshake(an, av, nodeName, isaOpt, time)
}
Expand Down
Loading