diff --git a/PeerSoN/README.md b/PeerSoN/README.md new file mode 100644 index 00000000..5200169a --- /dev/null +++ b/PeerSoN/README.md @@ -0,0 +1,64 @@ +# PeerSon - P2P Online social network + +## IN4391 Distributed Systems Group 3 + +### Authors: + +- Klabér, Rahim +- Morstøl, Hallvard Molin +- Qiu, Jason +- Samardžić, Mariana + +## About PeerSoN + +PeerSoN is a distributed Online Social Network (OSN). It is using the all-peers decentralized system architecture. PeerSoN is doing this by combining peer-to-peer (P2P) infrastructure, encryption and a distributed hash table (DHT). + +## Requirements + +These are the versions of tools we used. We have not tested if the system works on earlier or later versions. + +- Open JDK: 17.0.2 +- Scala version: 2.13.8 +- SBT: 1.6.2 + +## Running the project + +### Running with Intellij + +In IntelliJ, the `src/main/resources` folder needs to be marked as a resource folder. To do this, right-click +the folder and at the bottom there should be an option called "Mark directory as" and then "Resources root" should be selected. + +### Running the project + +Two environment variables need to be set. First `HOST` which is the computer's IP or the router's IP in the case that you are behind a NAT and want to use the application over the internet (This can be found by running `curl ifconfig.me`) . + +Second `BOOTSTRAP` needs to be set to the IP of a TOMP2P DHT which will bootstrap our DHT. + +To run a bootstrap node, use a dummy `HOST` value and set `BOOTSTRAP` to the computer's IP. Then login with a dummy name such as `BOOTSTRAPNODE`. The bootstrap node should now be ready. + + +Depending on your system build/compile: + +- `sbt compile` or build from inside your IDE. + +After building the project. +Run the main object in ./src/main/scala/main.scala. + +- `sbt run` or run from inside your IDE. + +You can then run some possible commands to send to the guardian, which works as the interface for the application: + +- `login` + - Guardian asks for mail and location to log user in. User gets any messages he received while offline. +- `logout` + - Guardian logs user out by asking for mail and location. +- `send-message` + - Guardian asks for sender, receiver and text to send and sends the message. +- `add-to-wall` + - Guardian asks for sender, receiver and text to send and adds the text to the users wall. +- `request-wall` + - Guardian asks for the user requesting, the file name and sends the file back. + - To be able to fetch entries of some user's wall, a user should first request the wall index file of that user which contains all the file names of the individual entries. + - The file name of a user's wall index is in the form email@wi. +- `exit` + - Exit the interface diff --git a/PeerSoN/build.sbt b/PeerSoN/build.sbt new file mode 100644 index 00000000..ddd5ceca --- /dev/null +++ b/PeerSoN/build.sbt @@ -0,0 +1,20 @@ +name := "in4391_peerson" + +version := "0.1" + +scalaVersion := "2.13.8" + +//resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" + + +val AkkaVersion = "2.6.18" +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion, + "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test, + "com.typesafe.akka" %% "akka-remote" % AkkaVersion, + "ch.qos.logback" % "logback-classic" % "1.2.10", + "net.tomp2p" % "tomp2p-all" % "5.0-Beta8", + "com.simtechdata" % "WaifUPnP" % "1.0" +) + +resolvers += ("tomp2p.net" at "http://tomp2p.net/dev/mvn/").withAllowInsecureProtocol(true) \ No newline at end of file diff --git a/PeerSoN/experiments/Dockerfile b/PeerSoN/experiments/Dockerfile new file mode 100644 index 00000000..dd1d87a3 --- /dev/null +++ b/PeerSoN/experiments/Dockerfile @@ -0,0 +1,103 @@ +# +# Scala and sbt Dockerfile +# +# Based on https://github.com/hseeberger/scala-sbt/blob/master/debian/Dockerfile +# Based on https://jrtechs.net/java/running-scala-code-in-docker + +# Pull base image +ARG BASE_IMAGE_TAG +FROM openjdk:${BASE_IMAGE_TAG:-17.0.2-jdk-bullseye} + +# Env variables +ARG SCALA_VERSION +ENV SCALA_VERSION ${SCALA_VERSION:-2.13.8} +ARG SBT_VERSION +ENV SBT_VERSION ${SBT_VERSION:-1.6.2} +ARG USER_ID +ENV USER_ID ${USER_ID:-1001} +ARG GROUP_ID +ENV GROUP_ID ${GROUP_ID:-1001} + +ARG buildtime_host=80.114.141.221 +ENV HOST=$buildtime_host + +ARG buildtime_bootstrap=150.230.20.128 +ENV BOOTSTRAP=$buildtime_bootstrap + +# Install sbt +RUN \ + curl -fsL "https://github.com/sbt/sbt/releases/download/v$SBT_VERSION/sbt-$SBT_VERSION.tgz" | tar xfz - -C /usr/share && \ + chown -R root:root /usr/share/sbt && \ + chmod -R 755 /usr/share/sbt && \ + ln -s /usr/share/sbt/bin/sbt /usr/local/bin/sbt + +# Install Scala +RUN \ + case $SCALA_VERSION in \ + "3"*) URL=https://github.com/lampepfl/dotty/releases/download/$SCALA_VERSION/scala3-$SCALA_VERSION.tar.gz SCALA_DIR=/usr/share/scala3-$SCALA_VERSION ;; \ + *) URL=https://downloads.typesafe.com/scala/$SCALA_VERSION/scala-$SCALA_VERSION.tgz SCALA_DIR=/usr/share/scala-$SCALA_VERSION ;; \ + esac && \ + curl -fsL $URL | tar xfz - -C /usr/share && \ + mv $SCALA_DIR /usr/share/scala && \ + chown -R root:root /usr/share/scala && \ + chmod -R 755 /usr/share/scala && \ + ln -s /usr/share/scala/bin/* /usr/local/bin && \ + case $SCALA_VERSION in \ + "3"*) echo "@main def main = println(util.Properties.versionMsg)" > test.scala ;; \ + *) echo "println(util.Properties.versionMsg)" > test.scala ;; \ + esac && \ + scala -nocompdaemon test.scala && rm test.scala + +# Install rpm for sbt-native-packager +# see https://github.com/hseeberger/scala-sbt/pull/114 +RUN apt-get update && \ + apt-get install rpm -y && \ + rm -rf /var/lib/apt/lists/* + +# Add and use user sbtuser +RUN groupadd --gid $GROUP_ID sbtuser && useradd -m --gid $GROUP_ID --uid $USER_ID sbtuser --shell /bin/bash +USER sbtuser + +# Switch working directory +WORKDIR /home/sbtuser + +# Prepare sbt (warm cache) +RUN \ + sbt sbtVersion && \ + mkdir -p project && \ + echo "scalaVersion := \"${SCALA_VERSION}\"" > build.sbt && \ + echo "sbt.version=${SBT_VERSION}" > project/build.properties && \ + echo "// force sbt compiler-bridge download" > project/Dependencies.scala && \ + echo "case object Temp" > Temp.scala && \ + sbt compile && \ + rm -r project && rm build.sbt && rm Temp.scala && rm -r target + +# Link everything into root as well +# This allows users of this container to choose, whether they want to run the container as sbtuser (non-root) or as root +USER root +RUN \ + ln -s /home/sbtuser/.cache /root/.cache && \ + ln -s /home/sbtuser/.sbt /root/.sbt && \ + if [ -d "/home/sbtuser/.ivy2" ]; then ln -s /home/sbtuser/.ivy2 /root/.ivy2; fi + +# Switch working directory back to root +## Users wanting to use this container as non-root should combine the two following arguments +## -u sbtuser +## -w /home/sbtuser +WORKDIR /root + +EXPOSE 6122/udp +EXPOSE 6122/tcp +EXPOSE 5001/udp +EXPOSE 5001/tcp +EXPOSE 5000/tcp + +WORKDIR /home/sbtuser/project + +RUN git clone https://github.com/rahimklaber/in4391_peerson.git + +WORKDIR /home/sbtuser/project/in4391_peerson + +RUN sbt compile + +CMD ["bash"] diff --git a/PeerSoN/experiments/README.md b/PeerSoN/experiments/README.md new file mode 100644 index 00000000..a343f799 --- /dev/null +++ b/PeerSoN/experiments/README.md @@ -0,0 +1,28 @@ +! This does not work, you can start the system but, it throws an error when trying to send messages ! + +Prerequisite: + +- Docker + +To start the application run in this folder the command: + +Find your ip with: `curl ifconfig.me` + +The first node created will have the bootstrap ip with: `curl ifconfig.me` + +`docker build -t peerson .\ --build-arg buildtime_bootstrap=your_ip --build-arg buildtime_bootstrap=bootstrap_ip` + +`docker run -it peerson /bin/bash` + +Check that the addresses are set correctly `echo host:$HOST bootstrap:$BOOTSTRAP` + +`sbt run` + +Possible commands for the guardian: + +- `login` +- `logout` +- `send-message` +- `add-to-wall` +- `request-wall` +- `exit` diff --git a/PeerSoN/experiments/Results/time.txt b/PeerSoN/experiments/Results/time.txt new file mode 100644 index 00000000..c2000575 --- /dev/null +++ b/PeerSoN/experiments/Results/time.txt @@ -0,0 +1,6 @@ +Time measured in seconds when performing different actions. +Login: 10.20, 10.169, 10.163, 10.191, 10.186, 10.176, 10.166, 10.182, 10.172, 10.182 +Login w/notifications: 10.105, 10.087, 10.098, 10.181, 10.072, 10.127, 10.212, 10.134, 10.111, 10.206 +Logout: 5.097, 5.074, 5.097, 5.077, 5.076, 5.082, 5.085, 5.090, 5.080, 5.090 +sync-message: 0.096, 0.062, 0.055, 0.052, 0.051, 0.054, 0.053, 0.054, 0.058, 0.054 +Request-wall: 10.082, 10.091, 10.121, 10.101, 10.065, 10.082, 10.091, 10.012, 10.099, 10.105 diff --git a/PeerSoN/src/main/resources/application.conf b/PeerSoN/src/main/resources/application.conf new file mode 100644 index 00000000..902279b6 --- /dev/null +++ b/PeerSoN/src/main/resources/application.conf @@ -0,0 +1,22 @@ +akka { + loglevel = "INFO" + + actor { + allow-java-serialization = true + provider = remote + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + + artery { + transport = tcp # See Selecting a transport below + bind.hostname = "0.0.0.0" # internal (bind) hostname + bind.port = 6122 # internal (bind) port + + canonical.hostname = ${HOST} # external (logical) hostname + canonical.port = 6122 # external (logical) port + } + log-sent-messages = on + log-received-messages = on + } +} \ No newline at end of file diff --git a/PeerSoN/src/main/resources/logback.xml b/PeerSoN/src/main/resources/logback.xml new file mode 100644 index 00000000..e47779df --- /dev/null +++ b/PeerSoN/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + %d{HH:mm:ss} [%thread] %-5level %logger{1000} - %msg%n + + + + + + + + + \ No newline at end of file diff --git a/PeerSoN/src/main/scala/REPLCommand.scala b/PeerSoN/src/main/scala/REPLCommand.scala new file mode 100644 index 00000000..8ecb32e1 --- /dev/null +++ b/PeerSoN/src/main/scala/REPLCommand.scala @@ -0,0 +1,45 @@ +/** + * a file for REPLCommand trait and child case classes + */ + +trait REPLCommand + +/** + * REPL lets `user` to login + * @param user user email (not hashed) + * @param location location string + */ +case class Login(user: String, location: String) extends REPLCommand + +/** + * REPL lets `user` to logout + * @param user user email (not hashed) + * @param location location string + */ +case class Logout(user: String, location: String) extends REPLCommand + +/** + * REPL lets user with `mail` to send a message `test` + * @param sender sender mail (not hashed) + * @param receiver receiver mail (not hashed) + * @param text text message + */ +case class SendMessage(sender: String, receiver: String, text: String) extends REPLCommand + +/** + * REPL lets `sender` to send a message `text` to `owner`'s wall + * @param sender sender email (not hashed) + * @param owner wall owner, message receiver, email (not hashed) + * @param text message text + */ +case class AddWallByUser(sender: String, owner: String, text: String) extends REPLCommand + +/** + * REPL lets `requester` to request a file `fileName` of `version` from responder` + * @param requester file requester, email (not hashed) + * @param responder file responder, email (not hashed) + * @param fileName filename + * @param version version == 0 by default + */ +case class RequestFileByUser(requester: String, responder: String, fileName: String, version: Int) extends REPLCommand + diff --git a/PeerSoN/src/main/scala/dht/DHT.scala b/PeerSoN/src/main/scala/dht/DHT.scala new file mode 100644 index 00000000..41f78db4 --- /dev/null +++ b/PeerSoN/src/main/scala/dht/DHT.scala @@ -0,0 +1,36 @@ +package dht + +trait DHT { + /** + * put a new key on the DHT + * create a data list for this key + * add the data to the list + */ + def put(key: String, data: Any): Unit + + /** + * retrieves the head of data list by key + * if key not existed, return None + */ + def get(key: String, callback: Option[Any] => Unit): Unit + + /** + * check if the DHT contains a certain key + */ + def contains(key: String, callback: Boolean => Unit): Unit + + /** + * update the data list with a new item under a key + */ + def append(key: String, data: Any): Unit + + /** + * retrieves all values stored in a data list under a key + */ + def getAll(key: String, callback: Option[List[Any]] => Unit): Unit + + /** + * remove a key-value pair + */ + def remove(key: String): Unit +} diff --git a/PeerSoN/src/main/scala/dht/DistributedDHT.scala b/PeerSoN/src/main/scala/dht/DistributedDHT.scala new file mode 100644 index 00000000..d003f6e0 --- /dev/null +++ b/PeerSoN/src/main/scala/dht/DistributedDHT.scala @@ -0,0 +1,118 @@ +package dht + +import net.tomp2p.dht.{PeerBuilderDHT, PeerDHT} +import net.tomp2p.futures.{BaseFuture, BaseFutureAdapter, FutureBootstrap} +import net.tomp2p.p2p.PeerBuilder +import net.tomp2p.peers.Number160 +import net.tomp2p.storage.Data + +import java.net.InetAddress +import scala.util.Random + +class DistributedDHT(nodeId: Int,bootstrapHost : String) extends DHT { + + // create a new DHT node + val p2p = new PeerBuilder(Number160.createHash(Random.nextLong())) +// .behindFirewall() + .ports(5000).start + val fd = p2p.discover().inetAddress(InetAddress.getByName(bootstrapHost)).ports(5000).start() + fd.awaitUninterruptibly() + + println(s"address = ${fd.peerAddress()}") + + // connect to a stable DHT node +// val fb: FutureBootstrap = this.peer.peer.bootstrap.inetAddress(InetAddress.getByName("150.230.20.128")).ports(5000).start + val fb: FutureBootstrap = p2p.bootstrap.peerAddress(fd.peerAddress()).start + fb.awaitUninterruptibly +// if (fb.isSuccess) peer.peer.discover.peerAddress(fb.bootstrapTo.iterator.next).start.awaitUninterruptibly + val peer: PeerDHT = new PeerBuilderDHT(p2p).start + + + + // METHODS FOR RETRIEVING FROM DHT + override def get(key: String, callback: Option[Any] => Unit): Unit = { + val futureGet = peer.get(Number160.createHash(key)).start + + futureGet.addListener(new BaseFutureAdapter[BaseFuture] { + + override def operationComplete(future: BaseFuture): Unit = { + if(future.isSuccess && !futureGet.dataMap.values.isEmpty) { + callback(Some(futureGet.dataMap.values.iterator.next.`object`())) + } else { + callback(null) + } + } + + }) + } + + override def getAll(key: String, callback: Option[List[Any]] => Unit): Unit = { + + val futureGet = peer.get(Number160.createHash(key)).start + + futureGet.addListener(new BaseFutureAdapter[BaseFuture] { + + override def operationComplete(future: BaseFuture): Unit = { + if(future.isSuccess && !futureGet.dataMap().values().isEmpty) { + val value = futureGet.dataMap.values.iterator.next.`object`() + value match { + case v : List[Any] => callback(Some(v)) + case v@_ => + callback(None) + } + } else { + callback(None) + } + } + + }) + } + + override def contains(key: String, callback: Boolean => Unit): Unit = { + val futureGet = peer.get(Number160.createHash(key)).start + + futureGet.addListener(new BaseFutureAdapter[BaseFuture] { + + override def operationComplete(future: BaseFuture): Unit = { + if(future.isSuccess) { + if (futureGet.isEmpty) { + callback(false) + } else { + callback(true) + } + } else { + callback(false) + } + } + }) + } + + + + // METHODS FOR PUTTING IN THE DHT + override def remove(key: String): Unit = { + peer.remove(Number160.createHash(key)).start() + } + + override def append(key: String, data: Any): Unit = { + val futureGet = peer.get(Number160.createHash(key)).start + futureGet.awaitUninterruptibly + if (futureGet.isSuccess) { + if (futureGet.dataMap.values.iterator.hasNext) { + + + val list = futureGet.dataMap.values.iterator.next.`object`() + list match { + case x :: xs => put(key, data :: (x :: xs)) + case _ => println("Could not append to list in dht, because the entry in the dht is not a list.") + } + } else { + put(key,data::Nil) + } + } + + } + override def put(key: String, data: Any): Unit = { + peer.put(Number160.createHash(key)).data(new Data(data)).start() + } +} diff --git a/PeerSoN/src/main/scala/logic/async_messages/AsyncMessage.scala b/PeerSoN/src/main/scala/logic/async_messages/AsyncMessage.scala new file mode 100644 index 00000000..dfab33c6 --- /dev/null +++ b/PeerSoN/src/main/scala/logic/async_messages/AsyncMessage.scala @@ -0,0 +1,66 @@ +package logic.async_messages + +import akka.actor.typed.scaladsl.ActorContext +import dht.DHT +import peer.{Notification, PeerMessage} +import logic.wall.Wall.WallEntry + +import scala.collection.mutable + +object AsyncMessage { + + case class OfflineMessage(sender: String, content: String, ack: Boolean) + + def load(context: ActorContext[PeerMessage], owner: String, dht: DHT): Unit = { + println(s"loading async messages of $owner") + val offMsgKey: String = getKey(owner) + dht.contains(offMsgKey, { res => + if (res) { + dht.get(offMsgKey, { + case Some(notifications: mutable.ListBuffer[Any]) => + println(s"I'm here with notifications $notifications") + notifications.foreach(msg => { + println(s"Loading offline message... $msg") + context.self ! Notification(msg) + }) + case _ => println(s"offline messages under $offMsgKey not found") + }) + } else { + println(s"No asynchronous message for $owner") + } + dht.put(offMsgKey, mutable.ListBuffer.empty[OfflineMessage]) + }) + } + + def getKey(owner: String): String = { + s"$owner@no" + } + + def AddWallEntry(sender: String, receiver: String, entryText: String, dht: DHT): Unit = { + val offMsgKey: String = getKey(receiver) + dht.get(offMsgKey, { + case Some(offMsgs: mutable.ListBuffer[Any]) => + val newOffMsgs = WallEntry(-1, sender, entryText) +: offMsgs + println(s"newOffMsgs: $newOffMsgs") + dht.put(offMsgKey, newOffMsgs) + println(s"adding offline message to $receiver's async message list") + case _ => + dht.put(offMsgKey, mutable.ListBuffer.empty[OfflineMessage]) + println(s"async message list of $receiver not found, create a new empty one") + }) + } + + def add(sender: String, receiver: String, text: String, ack: Boolean, dht: DHT): Unit = { + val offMsgKey: String = getKey(receiver) + dht.get(offMsgKey, { + case Some(offMsgs: mutable.ListBuffer[OfflineMessage]) => + val newOffMsgs = OfflineMessage(sender, text, ack) +: offMsgs + println(s"newOffMsgs: $newOffMsgs") + dht.put(offMsgKey, newOffMsgs) + println(s"adding offline message to $receiver's async message list") + case _ => + dht.put(offMsgKey, mutable.ListBuffer.empty[OfflineMessage]) + println(s"async message list of $receiver not found, create a new empty one") + }) + } +} diff --git a/PeerSoN/src/main/scala/logic/login/LocatorInfo.scala b/PeerSoN/src/main/scala/logic/login/LocatorInfo.scala new file mode 100644 index 00000000..a90f7983 --- /dev/null +++ b/PeerSoN/src/main/scala/logic/login/LocatorInfo.scala @@ -0,0 +1,19 @@ +package logic.login + +import logic.login.State.State + +case class LocatorInfo( + locator: String, // a string defined by user, to tell on which machine the user is currently active + IP: String, + port: String, + state: State, // only one peerID can be active at a time + path: String // path accessible by scala actors + // meshID: String, // optional, if a user is participating in a user mesh project + // GPS: String, // optional, coordinate based P2p neighbour selection + // timestamp: Int // time in seconds since 01.01.1970, used fot OpenDHT problems -> maybe not needed + ) + +object State extends Enumeration { + type State = Value + val online, active, offline = Value +} diff --git a/PeerSoN/src/main/scala/logic/login/LoginProcedure.scala b/PeerSoN/src/main/scala/logic/login/LoginProcedure.scala new file mode 100644 index 00000000..9da0b80d --- /dev/null +++ b/PeerSoN/src/main/scala/logic/login/LoginProcedure.scala @@ -0,0 +1,84 @@ +package logic.login + +import dht.DistributedDHT + +import java.io.{BufferedReader, InputStreamReader} +import java.net.URL + +class LoginProcedure(val location: String, val hashedMail: String, val path: String, val DistributedDHT: DistributedDHT, val startingTimestamp: Long) { + + def start(): Unit = { + DistributedDHT.contains(hashedMail, recievedContains) + } + + def recievedContains(contains: Boolean): Unit ={ + if (contains) { + login() + } else { + register() + } + } + + /** + * + * @param location location in string, say "laptop", "home" + * @param hashedMail hashedMail + */ + def login(): Unit = { + // 1. get user info from the DHT + DistributedDHT.getAll(hashedMail, receivedUserInfo) + } + + def receivedUserInfo(userLocatorInfos: Option[List[Any]]): Unit ={ + var locationInfoList: List[LocatorInfo] = userLocatorInfos match { + case Some(value) => value.asInstanceOf[List[LocatorInfo]] + case None => throw new Exception() // TODO: handle error + } + + // 2. if no desired location add it + val desiredLocation = locationInfoList.filter(l => l.locator == location) + if (desiredLocation.isEmpty) { + locationInfoList = LocatorInfo(location, findIPAddress(), "80", State.active, path) :: locationInfoList + } + + // 3. update user info + // - only one location is active + // - but there might be multiple locations that are online + // - needs improvement if needed + val updateUserInfo = locationInfoList.map(l => { + val newState = { + if (l.locator == location) State.active + else { + // active/online -> online + if (l.state != State.offline) State.online + else State.offline + } + } + LocatorInfo(l.locator, l.IP, l.port, newState, l.path) + }) + + // 4. send new info to DHT + // LocalDHT.put(hashedMail, updateUserInfo.head) + // updateUserInfo.tail.foreach(l => LocalDHT.append(hashedMail, l)) + DistributedDHT.put(hashedMail, updateUserInfo) + println("Current user data:") + println(updateUserInfo) + println("Time elapsed: " + (System.currentTimeMillis() - startingTimestamp)/1000.0) + } + + def register(): Unit = { + val ip = findIPAddress() + val port = "80" + val locatorInfo = LocatorInfo(location, ip, port, State.active, path) + DistributedDHT.put(hashedMail, locatorInfo :: Nil) + println("Current user data:") + println(locatorInfo::Nil) + println("Time elapsed: " + (System.currentTimeMillis() - startingTimestamp)/1000.0) + } + + def findIPAddress(): String = { + val whereIsMyIPURL = new URL("http://checkip.amazonaws.com") + val in: BufferedReader = new BufferedReader(new InputStreamReader(whereIsMyIPURL.openStream())) + in.readLine() + } +} diff --git a/PeerSoN/src/main/scala/logic/login/LogoutProcedure.scala b/PeerSoN/src/main/scala/logic/login/LogoutProcedure.scala new file mode 100644 index 00000000..4528671f --- /dev/null +++ b/PeerSoN/src/main/scala/logic/login/LogoutProcedure.scala @@ -0,0 +1,33 @@ +package logic.login + +import dht.DistributedDHT + +class LogoutProcedure(val location: String, val hashedMail: String, val DistributedDHT: DistributedDHT, val startingTimestamp: Long) { + + def start(): Unit = { + DistributedDHT.getAll(hashedMail, onReceivedLookup) + } + + def onReceivedLookup(lookup:Option[List[Any]]): Unit ={ + lookup match { + case Some(value) => + val locatorInfoList: List[LocatorInfo] = value.asInstanceOf[List[LocatorInfo]] + + val updateUserInfo = locatorInfoList.map(l => { + val newState = { + if (l.locator == location) State.offline + else { + l.state + } + } + LocatorInfo(l.locator, l.IP, l.port, newState, l.path) + }) + DistributedDHT.put(hashedMail, updateUserInfo) + println("Current user data:") + println(updateUserInfo) + println("Time elapsed: " + (System.currentTimeMillis() - startingTimestamp)/1000.0) + + case _ => println(s"user not found by DHT!") + } + } +} diff --git a/PeerSoN/src/main/scala/logic/wall/File.scala b/PeerSoN/src/main/scala/logic/wall/File.scala new file mode 100644 index 00000000..2956305b --- /dev/null +++ b/PeerSoN/src/main/scala/logic/wall/File.scala @@ -0,0 +1,18 @@ +package logic.wall + +/** + * a trait to represent the file in communication + */ +trait File/* { + val fileName: String + val fileType: FileType.FileType + val content: String +}*/ + +/** + * FileType + */ +object FileType extends Enumeration { + type FileType = Value + val Index, List, FriendList, FirstName, LastName, BirthDay, City, WallIndex, WallEntry = Value +} diff --git a/PeerSoN/src/main/scala/logic/wall/FileOperations.scala b/PeerSoN/src/main/scala/logic/wall/FileOperations.scala new file mode 100644 index 00000000..3bccabb8 --- /dev/null +++ b/PeerSoN/src/main/scala/logic/wall/FileOperations.scala @@ -0,0 +1,11 @@ +package logic.wall + +import logic.login.LocatorInfo + +object FileOperations { + /** + * DHTFileEntry is a file entry stored in DHT: + * "${hashedMail}@${fileType} -> ${hashedMail}#${locator}#${version}" + */ + case class DHTFileEntry(hashedMail: String, locator: LocatorInfo, version: Int) +} diff --git a/PeerSoN/src/main/scala/logic/wall/Wall.scala b/PeerSoN/src/main/scala/logic/wall/Wall.scala new file mode 100644 index 00000000..9f426b63 --- /dev/null +++ b/PeerSoN/src/main/scala/logic/wall/Wall.scala @@ -0,0 +1,90 @@ +package logic.wall + +import akka.actor.typed.scaladsl.ActorContext +import dht.DHT +import peer.{Message, PeerMessage} +import services.Encrypt + +import scala.collection.mutable + +/** + * Async Wall Protocol: + * wi - The wall index file contains a list of all wall entries a user has. + * we - This file contains one wall entry. The number of the wall entry is successive + * and defined on the receiver's side + * ---------------------------------------------------------------------------------------- + * To put a message on another user's Wall, the following key-value pair is stored in DHT + * (key, value) + * key - ${hashedMail}@we${index}, where ${hashedMail} is the hashed mail of the receiver + * value - WallEntry(sender, content) + * ---------------------------------------------------------------------------------------- + * The Wall object manages methods to update the Wall entries to DHT + */ + +object Wall { + /** + * + * @param index the current index of the entry + * @param sender the sender email (not hashed) + * @param content the file/message content + */ + case class WallEntry(index: Int, sender: String, content: String) extends File + + /** + * The WallIndex File, stored in the DHT in the key-value form of + * wallIndexKey -> WallIndex(owner, lastEntryIndex, entries) + * + * @param hashedMail who owns the wall, email (not hashed) + * @param lastEntryIndex the index of the most recent entry (i.e., the current last index) + * @param entries a list buffer of all wall entries + */ + case class WallIndex(hashedMail: String, lastEntryIndex: Int, entries: mutable.ListBuffer[String]) + + + /** + * initialization: + * create a WallIndex entry for owner if not existed + * + * @param owner who owns the wall, mail (not hashed) + */ + def load(context: ActorContext[PeerMessage], owner: String, dht: DHT): Unit = { + val wallIndexKey: String = getWallIndexKey(owner) + // if wallIndexKey found, then try to fetch the wall entries + dht.contains(wallIndexKey, { res => + if (res) { + val wallIndexLookup = dht.get(wallIndexKey, { + case Some(ownerWallIndex: WallIndex) => + val wallEntryKeyBuffer = ownerWallIndex.entries + wallEntryKeyBuffer.foreach(wallEntryKey => { + val lookup = dht.get(wallEntryKey, { + case Some(currentWallEntry: WallEntry) => + context.self ! Message(currentWallEntry.sender, currentWallEntry.content, ack = false) + // remove from DHT + dht.remove(wallEntryKey) + case _ => println(s"WallEntry under ${wallEntryKey} not found") + }) + + }) + case _ => println(s"WallIndex under ${wallIndexKey} not found") + }) + } + }) + + dht.put(wallIndexKey, WallIndex(Encrypt(owner), -1, mutable.ListBuffer.empty)) + } + + def getWallIndexKey(owner: String): String = { + s"${owner}@wi" + } + + /** + * generate a wall entry key + * + * @param owner mail (not hashed) + * @param index current index for the wall entry + * @return wall entry key + */ + def getWallEntryKey(owner: String, index: Int): String = { + s"${owner}@we${index}" + } +} diff --git a/PeerSoN/src/main/scala/main.scala b/PeerSoN/src/main/scala/main.scala new file mode 100644 index 00000000..77322632 --- /dev/null +++ b/PeerSoN/src/main/scala/main.scala @@ -0,0 +1,171 @@ +import akka.actor.typed.{ActorRef, ActorSystem, Behavior} +import akka.actor.typed.scaladsl.Behaviors +import com.simtechdata.waifupnp.UPnP +import dht.{DHT, DistributedDHT} +import peer.{AddToWallCommand, GetFileCommand, PeerCmd, PeerMessage, SendMessageCommand} + +import scala.collection.mutable +import scala.io.StdIn.readLine +import com.typesafe.config.ConfigFactory +import services.GetPeerKey + +object Guardian { + + def apply(): Behavior[REPLCommand] = Behaviors.setup { context => + /** + * create a map of peers, [a local backup for Guardian to speed up lookup] + * s"'${user}'@${location}" -> ActorRef[PeerMessage] + * only stores active/online users, offline users will be removed + */ + val peers: mutable.Map[String, ActorRef[PeerMessage]] = mutable.Map() + + var counter = 2 + var dht : DHT = null // for wall add + /** + * get ActorRef if the message sender is now active/online + * @param sender sender mail (not hashed) + * @return ActorRef if exists, else None + */ + def getPeerRefByGuardian(sender: String): Option[ActorRef[PeerMessage]] = { + val validSenders = peers.keys.filter(k => k.startsWith(s"'$sender'")) + if (validSenders.isEmpty) { + None + } else { + Some(peers(validSenders.head)) + } + } + + Behaviors.receiveMessage { msg: REPLCommand => + msg match { + + case Login(user: String, location: String) => + + // check if this user is already logged in + val peerKey = GetPeerKey(user, location) + if (peers.contains(peerKey)){ + println("User is already logged in with this device") + } + + else { + + val bootstrapHost = System.getenv("BOOTSTRAP") + if(bootstrapHost == null){ + context.log.info("BOOTSTRAP not set") + }else{ + context.log.info(s"BOOTSTRAP node is : ${bootstrapHost}") + } + + // create a dht node for new peer + val dhtNode: DistributedDHT = new DistributedDHT(counter,bootstrapHost) + dht = dhtNode + counter = counter + 1 + // create a new peer + val peerRef = context.spawn(peer.Peer(user, dhtNode), peerKey) + peers.put(peerKey, peerRef) + // send a Login command to the peer + val peerPath = peerRef.path.toStringWithAddress(context.system.address) + peerRef ! peer.Login(location, peerPath) + } + + case Logout(user: String, location: String) => + val lookup = getPeerRefByGuardian(user) + lookup match { + case Some(userRef: ActorRef[PeerMessage]) => + userRef ! peer.Logout(location) + context.stop(userRef) + peers.remove(GetPeerKey(user, location)) + println("Logout successful") + case _ => + println(s"User $user currently unavailable") + } + + /** + * 1. if any location of receiver is found active/online, send message + * 2. if not, add to wall + */ + case SendMessage(sender: String, receiver: String, text: String) => + val lookup = getPeerRefByGuardian(sender) + lookup match { + case Some(senderRef: ActorRef[PeerMessage]) => + senderRef ! PeerCmd(SendMessageCommand(receiver, text)) + case _ => + println(s"Sender $sender currently unavailable") + } + + case AddWallByUser(sender: String, owner: String, text: String) => + val lookup = getPeerRefByGuardian(sender) + lookup match { + case Some(senderRef: ActorRef[PeerMessage]) => + senderRef ! PeerCmd(AddToWallCommand(owner,text)) + case _ => + println(s"Owner $owner currently unavailable") + } + + case RequestFileByUser(requester: String, responder: String, fileName: String, version: Int) => + val lookup = getPeerRefByGuardian(requester) + lookup match { + case Some(requesterRef: ActorRef[PeerMessage]) => + requesterRef ! PeerCmd(GetFileCommand(fileName,null)) + case _ => + println(s"Peer $requester currently unavailable") + } + + case _ => () + } + Behaviors.same + } + } +} + + + +object main extends App { + + var isRemote = false + if(args.length > 0){ + isRemote = true + } + + UPnP.openPortTCP(6122) + UPnP.openPortUDP(6122) + UPnP.openPortTCP(5001) + UPnP.openPortUDP(5000) + UPnP.openPortTCP(5000) + + println(UPnP.getExternalIP) + val guardian = setupGuardian(isRemote) + + + while (true) { + println("Guardian waits for your command") + val input = readLine.strip.split(" ") + val command: REPLCommand = input.head match { + case "login" => + Login(readLine("email: ").strip, readLine("location: ").strip) + case "logout" => + Logout(readLine("email: ").strip, readLine("location: ").strip) + case "send-message" => + SendMessage(readLine("sender: ").strip, readLine("receiver: ").strip, readLine("text: ").strip) + case "add-to-wall" => + AddWallByUser(readLine("sender: ").strip, readLine("owner: ").strip, readLine("text: ").strip) + case "request-wall" => + RequestFileByUser(readLine("requester: ").strip, ""/*readLine("responder: ").strip*/, + readLine("fileName: ").strip, version = 0) + case "exit" => + sys.exit + case _ => new REPLCommand {} + } + guardian ! command + } + + def setupGuardian(isRemote: Boolean): ActorSystem[REPLCommand] ={ + if (isRemote){ + val config = ConfigFactory.load("remote_application") + ActorSystem(Guardian(), "guardian",config) + } else { + ActorSystem(Guardian(), "guardian") + } + } +} + + diff --git a/PeerSoN/src/main/scala/peer/Command.scala b/PeerSoN/src/main/scala/peer/Command.scala new file mode 100644 index 00000000..32cf4848 --- /dev/null +++ b/PeerSoN/src/main/scala/peer/Command.scala @@ -0,0 +1,14 @@ +package peer + +import akka.actor.typed.ActorRef + +trait Command + +/** + * Command + */ + +case class GetFileCommand(fileName: String, replyTo: ActorRef[PeerMessage]) extends Command +case class AddToWallCommand(user: String, text: String) extends Command +case class SendMessageCommand(receiver: String, text: String) extends Command +case class AddOfflineMessage(receiver: String, text: String, ack: Boolean) extends Command diff --git a/PeerSoN/src/main/scala/peer/Peer.scala b/PeerSoN/src/main/scala/peer/Peer.scala new file mode 100644 index 00000000..9aef9a15 --- /dev/null +++ b/PeerSoN/src/main/scala/peer/Peer.scala @@ -0,0 +1,219 @@ +package peer + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} +import logic.async_messages.AsyncMessage +import logic.async_messages.AsyncMessage.OfflineMessage +import dht.DistributedDHT +import logic.wall.FileOperations.DHTFileEntry +import logic.login.{LocatorInfo, LoginProcedure, LogoutProcedure, State} +import logic.wall.Wall.WallEntry +import logic.wall.{File, FileOperations, Wall} +import services.{CheckIfOnlineWithLocation, Encrypt, GetPathByMail, GetPeerRef} + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + + +object PeerWall { + case class WallIndex(owner: String, lastIndex: Int, entries: ListBuffer[String]) extends File +} + + +object Peer { + def apply(mail: String, dhtNode: DistributedDHT): Behavior[PeerMessage] = { + Behaviors.setup(context => { + new PeerBehavior(context, mail, dhtNode) + }) + } + + class PeerBehavior(context: ActorContext[PeerMessage], mail: String, dhtNode: DistributedDHT) extends AbstractBehavior[PeerMessage](context) { + + private val hashedMail = Encrypt(mail) + // the peer location, e.g., home / phone + var location : String ="" + // the peer path, like akka://guadian@localhost/.... + var path : String ="" + + /** + * instance variable - a mutable map to store all the local files + * TODO (if time allows): connects to JSON files or databases to fetch a peer's files + * Now just create a new Map every time + */ + val localFiles: mutable.Map[String, File] = mutable.Map() + val WALL_INDEX_KEY = s"$hashedMail@wi" + var wallIndex: PeerWall.WallIndex = PeerWall.WallIndex(hashedMail, -1, ListBuffer.empty) + + /** + * + * @param sender the hashed mail of the person who added the entry. + */ + def addToWall(sender:String, text: String): Unit = { + val newIndex = wallIndex.lastIndex + 1 + val entryKey = Wall.getWallEntryKey(mail,newIndex) + wallIndex.entries.append(entryKey) + localFiles.put(entryKey, WallEntry(newIndex,sender, text)) // is this ok? + dhtNode.append(entryKey, DHTFileEntry(hashedMail, LocatorInfo(location,"","",State.active,path), 0)) // for now assume not versioning + + // increment index + localFiles.put(WALL_INDEX_KEY, wallIndex.copy(lastIndex = newIndex)) + + dhtNode.append(WALL_INDEX_KEY, DHTFileEntry(hashedMail, LocatorInfo(location,"","",State.active,path), 0)) // for now assume not versioning + } + + /** + * Timestamps of when we sent a message + */ + val timeStamps = mutable.Map[Long,Long]() + // for keeping track of message timestamps + var currentTimeStampId = 0L + + /** + * message handler + * + * @param msg incoming Akka message + */ + override def onMessage(msg: PeerMessage): Behavior[PeerMessage] = { + context.log.info(s"$mail - received message: $msg") + msg match { + + + case AddWallEntry(sender,text) => + addToWall(sender,text) + + case Message(sender, text, ack,id) => + if (ack) { + timeStamps.get(id) match { + case Some(value) => { + context.log.info(s"$sender sent an ack") + context.log.info(s"Message sent and received ack with latency : ${(System.currentTimeMillis() - timeStamps(id))/1000.0}") + } + case _ => context.log.info("got ack for message we don't know about.") + } + + } else { + context.log.info(s"From: $sender | Message: $text") + new GetPathByMail(sender, dhtNode,{ + case Some(senderPath: String) => + GetPeerRef(context, senderPath) ! Message(mail, "I got your message", ack = true,id) + case _ => + AsyncMessage.add(mail, sender, "I got your message", ack = true, dhtNode) + }).get() + } + + + case Login(location, path) => + AsyncMessage.load(context, mail, dhtNode) + this.location = location + this.path = path + val loginProcedure = new LoginProcedure(location, hashedMail, path, dhtNode, System.currentTimeMillis()) + loginProcedure.start() + + + case Logout(location) => + val logoutProcedure = new LogoutProcedure(location, hashedMail, dhtNode, System.currentTimeMillis()) + logoutProcedure.start() + + case FileRequest(fileName, version, replyTo,id) => + localFiles.get(fileName) match { + case Some(value) if value.isInstanceOf[File] => + replyTo ! FileResponse(200, fileName, version, Some(value), context.self,id) + case Some(_) => () + case None => replyTo ! FileResponse(404, fileName, version, None, context.self,id) + } + + + // If we get a response, store it locally. + case FileResponse(code, fileName, version, received, from, id) => + timeStamps.get(id) match { + case Some(value) => context.log.info(s"File response received in : ${(System.currentTimeMillis() - timeStamps(id))/1000.0}") + case _ => context.log.info("got a response for request we don't know about") + } + if (code == 200){ + received match { + case Some(file) => + context.log.info("we received a response for our file request, storing and advertising the file...") + // store file locally and advertise it as well + localFiles.put(fileName, file) + dhtNode.append(fileName, DHTFileEntry(hashedMail, LocatorInfo(location,"","",State.active,path), 0)) + case None => () + } + } + + + case PeerCmd(cmd) => + cmd match { + + + // command the current peer (as sender) to put text on receiver's wall + case AddToWallCommand(receiver, text) => + new GetPathByMail(receiver, dhtNode,{ + case Some(receiverPath: String) => + services.GetPeerRef(context, receiverPath) ! AddWallEntry(mail,text) + case None => AsyncMessage.AddWallEntry(mail,receiver,text,dhtNode) + }).get() + + + case AddOfflineMessage(receiver, text, ack) => + AsyncMessage.add(mail, receiver, text, ack, dhtNode) + + + // command the current peer to request a file + case GetFileCommand(fileName, replyTo) => + var fileNameInDHT = fileName + + // wall index is hashed + val extension = fileName.takeRight(3) + if (extension == "@wi"){ + val realFileName = fileName.dropRight(3) + fileNameInDHT = Encrypt(realFileName) + "@wi" + } + val id = currentTimeStampId + currentTimeStampId +=1 + timeStamps.put(id,System.currentTimeMillis()) + + + dhtNode.getAll(fileNameInDHT, { case Some(l: List[DHTFileEntry]) => + var found = false + for (e <- l) { + CheckIfOnlineWithLocation(dhtNode, e.hashedMail, e.locator, { foundPath => + if (!found) { + found = true + GetPeerRef(context, foundPath) ! FileRequest(fileNameInDHT, 0, context.self,id) + } + }) + } + case None => println(s"The file ${fileNameInDHT} could not be found.") + }) + + + // command the current peer to send message + case SendMessageCommand(receiver, text) => + new GetPathByMail(receiver,dhtNode,{ + case Some(receiverPath: String) => + val id = currentTimeStampId + currentTimeStampId +=1 + timeStamps.put(id,System.currentTimeMillis()) + services.GetPeerRef(context, receiverPath) ! Message(mail, text, ack = false, id = id) + case _ => + context.self ! PeerCmd(AddOfflineMessage(receiver, text, ack = false)) + }).get() + case _ => () + } + + + case Notification(content) => + content match { + + + case OfflineMessage(sender: String, content: String, ack: Boolean) => + context.self ! Message(sender, content, ack) + + + case WallEntry(_,sender,content) => addToWall(sender,content) + } + } + this + } + } +} \ No newline at end of file diff --git a/PeerSoN/src/main/scala/peer/PeerMessage.scala b/PeerSoN/src/main/scala/peer/PeerMessage.scala new file mode 100644 index 00000000..dae5b3c5 --- /dev/null +++ b/PeerSoN/src/main/scala/peer/PeerMessage.scala @@ -0,0 +1,45 @@ +package peer + +import akka.actor.typed.ActorRef +import logic.wall.File + +trait PeerMessage + +/** + * PeerMessage + */ + +// used for login REPL command +case class Login(location: String, path: String) extends PeerMessage + +// used for logout REPL command +case class Logout(location: String) extends PeerMessage + +case class Message(sender: String, text: String, ack: Boolean, id : Long = -1L) extends PeerMessage + +case class AddWallEntry(sender:String,text: String) extends PeerMessage + +// for now assume version == 0 +case class FileRequest(fileName: String, version: Int, replyTo: ActorRef[PeerMessage],id : Long = -1L) extends PeerMessage + +/** + * @param code response code. Kinda like http response codes. + */ +case class FileResponse(code: Int, fileName: String, version: Int, + file: Option[File], from: ActorRef[PeerMessage],id : Long = -1L) extends PeerMessage + +/** + * for sending commands to the peer + * @param cmd of Command trait + */ +case class PeerCmd(cmd: Command) extends PeerMessage + +/** + * offline notifications + * @param content Any, e.g., AsyncMessage.OfflineMessage, etc. + */ +case class Notification(content: Any) extends PeerMessage + + + + diff --git a/PeerSoN/src/main/scala/services/CheckIfOnlineWithLocation.scala b/PeerSoN/src/main/scala/services/CheckIfOnlineWithLocation.scala new file mode 100644 index 00000000..7b1b5b53 --- /dev/null +++ b/PeerSoN/src/main/scala/services/CheckIfOnlineWithLocation.scala @@ -0,0 +1,27 @@ +package services + +import dht.DHT +import logic.login.{LocatorInfo, State} + + +/** + * Check if a list of peers are online at certain locations. If so return this information. + */ +object CheckIfOnlineWithLocation{ + /** + * Try to find a peer with the given mail and location. If found run the call back with the path of that peer. + */ + def apply(dht: DHT,hashedMail:String, locator: LocatorInfo, cb : (String) => Unit): Unit ={ + dht.getAll(hashedMail, { + case Some(l : List[LocatorInfo]) => { + val foundLocation = l.filter(e => e.state != State.offline) + .find(e => e.locator == locator.locator) + foundLocation match { + case Some(e) => cb(e.path) + case _ => () + } + } + case None => () + }) + } +} diff --git a/PeerSoN/src/main/scala/services/Encrypt.scala b/PeerSoN/src/main/scala/services/Encrypt.scala new file mode 100644 index 00000000..ad9b1862 --- /dev/null +++ b/PeerSoN/src/main/scala/services/Encrypt.scala @@ -0,0 +1,23 @@ +package services + +import java.math.BigInteger +import java.security.MessageDigest + +object Encrypt { + /** + * encrypt the received string with SHA-256 + * Used SHA-256 because MD5 used in paper is not secure + * (Checked with linux: "echo -n "Test@test.com" | openssl dgst -sha256") + * reference: + * https://stackoverflow.com/questions/46329956/how-to-correctly-generate-sha-256-checksum-for-a-string-in-scala + * + * @param value the string (usually the email) to hash + * @return the encrypted result - of type String + */ + def apply(value: String): String = { + val digest = MessageDigest + .getInstance("SHA-256") + .digest(value.getBytes("UTF-8")) + String.format("%064x", new BigInteger(1, digest)) + } +} diff --git a/PeerSoN/src/main/scala/services/GetPathByMail.scala b/PeerSoN/src/main/scala/services/GetPathByMail.scala new file mode 100644 index 00000000..430cd7f4 --- /dev/null +++ b/PeerSoN/src/main/scala/services/GetPathByMail.scala @@ -0,0 +1,36 @@ +package services + +import dht.DistributedDHT +import logic.login.{LocatorInfo, State} + +class GetPathByMail(val mail: String, val distributedDHT: DistributedDHT, val callback: Option[String] => Unit) { + + /** + * find a current active/online actor's path purely based on its mail + * should be a function of DHT + * @param mail mail to look up + * @return path if exists, else None + */ + def get(): Unit = { + val hashedMail: String = Encrypt(mail) + distributedDHT.getAll(hashedMail, onReceivedLookup) + } + + def onReceivedLookup(lookup:Option[List[Any]]): Unit ={ + println(lookup) + lookup match { + case Some(value: List[LocatorInfo]) => + // filter an active or online locator info + val validLocatorInfoList = value.filter(l => l.state == State.active || l.state == State.online) + if (validLocatorInfoList.isEmpty) { + println(s"peer $mail not found by DHT") + callback(None) + } else { + callback(Some(validLocatorInfoList.head.path)) + } + case _ => + callback(None) + } + } +} + diff --git a/PeerSoN/src/main/scala/services/GetPeerKey.scala b/PeerSoN/src/main/scala/services/GetPeerKey.scala new file mode 100644 index 00000000..8e1ddc53 --- /dev/null +++ b/PeerSoN/src/main/scala/services/GetPeerKey.scala @@ -0,0 +1,13 @@ +package services + +object GetPeerKey { + /** + * user and location as a key of peers + * + * @param user mail (not hashed) + * @param location location string + */ + def apply(user: String, location: String): String = { + s"'$user'@$location" + } +} diff --git a/PeerSoN/src/main/scala/services/GetPeerRef.scala b/PeerSoN/src/main/scala/services/GetPeerRef.scala new file mode 100644 index 00000000..41ca6616 --- /dev/null +++ b/PeerSoN/src/main/scala/services/GetPeerRef.scala @@ -0,0 +1,14 @@ +package services + +import akka.actor.ActorSelection +import akka.actor.typed.scaladsl.ActorContext +import peer.PeerMessage + +/** + * Get a reference to a peer from its path. + */ +object GetPeerRef { + def apply(context: ActorContext[PeerMessage], path: String): ActorSelection = { + context.system.classicSystem.actorSelection(path) + } +}