Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master worker #10

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/example09masterworker/app/Cluster.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module Cluster

open Proto.Cluster
open Proto.Cluster.Consul
open System

let startCluster nodeName =
let consulConfigure = Action<Consul.ConsulClientConfiguration>(fun c -> c.Address <- Uri("http://consul:8500/"))
printfn "Connecting to cluster"
Cluster.Start("FiresideChatCluster", nodeName, 12000, ConsulProvider(ConsulProviderOptions(), consulConfigure))
printfn "Connected to cluster"

Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ FROM gcr.io/google-appengine/aspnetcore:2.0
ADD ./bin/Release/netcoreapp2.0/publish/ /app

WORKDIR /app

ENTRYPOINT [ "dotnet", "Node1.dll" ]
EXPOSE 12000
ENTRYPOINT [ "dotnet", "app.dll" ]
#CMD [ "dotnet", "app.dll", "m", "100" ]
15 changes: 15 additions & 0 deletions examples/example09masterworker/app/HelloProtoActor.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module HelloProtoActor
open Proto.FSharp

let hello() =
let pid1 =
Actor.create (printfn "Hello from actor1: %A")
|> Actor.initProps
|> Actor.spawn
let pid2 =
Actor.create (fun x -> printfn "Hello from actor2: %A" x; (sprintf "I was called with %A" x) >! pid1)
|> Actor.initProps
|> Actor.spawn

[ 1 .. 100 ]
|> List.iter (tell pid2)
69 changes: 69 additions & 0 deletions examples/example09masterworker/app/Master.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
module Master
open MessageHelpers
open Proto.FSharp
open Proto.Remote

module Sink =
type SinkState = {
ExpectedNumberOfResult: int option
NumberOfResult: int
TotalSum: float32
}

let sinkHandler message state =
let state' =
match message with
| SubmitExpectedResultCount m -> {state with ExpectedNumberOfResult = Some (m.Count)}
| SubmitResult m -> {state with TotalSum = state.TotalSum + m.Result; NumberOfResult = state.NumberOfResult + 1}
| _ -> state
match state'.ExpectedNumberOfResult with
| Some x when x = state'.NumberOfResult ->
printfn "Average is: %A" (state'.TotalSum / (float32)state'.NumberOfResult)
| None ->
printfn "Current sink state: %A" (state', message)
| Some _ -> ()
state'

let createSink() =
Actor.withState (mapMsg >> sinkHandler) {ExpectedNumberOfResult = None; NumberOfResult = 0; TotalSum = (float32)0.}
|> Actor.initProps
|> Actor.spawn

type MasterState = {
RemainingWork: int list
SinkPid: Proto.PID option
}

let setupSink expectedWorkLoad =
let sinkPid = Sink.createSink()
let expectedCountMsg = MessageHelpers.newSubmitExpectedResultCount expectedWorkLoad
expectedCountMsg >! sinkPid
sinkPid

let handler (context: Proto.IContext) message state =
match message |> MessageHelpers.mapMsg with
| SystemMessage (SystemMessage.Started _) ->
printfn "Starting master"
let sinkPid = setupSink (state.RemainingWork.Length)
printfn "Sink started"
{state with SinkPid = Some sinkPid}
| RequestWork m ->
match state.RemainingWork with
| [] -> state
| x::rest ->
let sinkPid = state.SinkPid |> Option.get
let protoPid = m.Pid |> MessageHelpers.toProtoPid
newSubmitWork sinkPid x >! protoPid
{state with RemainingWork = rest}

let createMasterProps numberOfWork =
Actor.withState2 handler {RemainingWork = [1 .. numberOfWork]; SinkPid = None}
|> Actor.initProps

let createMaster = createMasterProps >> (Actor.spawn)

let startMaster numberOfWork =
let props = createMasterProps numberOfWork
Remote.RegisterKnownKind("MasterKind", props)
createMaster numberOfWork |> ignore
Cluster.startCluster "master"
55 changes: 55 additions & 0 deletions examples/example09masterworker/app/MessageHelpers.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module MessageHelpers
open ProtoActorDemo.Messages
open Proto.FSharp

let toMessagePid (pid: Proto.PID) =
let messagePid = PID()
messagePid.Address <- pid.Address
messagePid.Id <- pid.Id
messagePid

let toProtoPid (pid: PID) =
Proto.PID(pid.Address, pid.Id)

let newRequestWork (pid: Proto.PID) =
let requestWork = RequestWork()
requestWork.Pid <- toMessagePid pid
requestWork

let newSubmitWork (pid: Proto.PID) data =
let submitWork = SubmitWork()
submitWork.Data <- data
submitWork.Pid <- toMessagePid pid
submitWork

let newSubmitResult result =
let submitResult = SubmitResult()
submitResult.Result <- result
submitResult

let newSubmitExpectedResultCount count =
let submitExpectedResultCount = new SubmitExpectedResultCount()
submitExpectedResultCount.Count <- count
submitExpectedResultCount

type Message =
| RequestWork of RequestWork
| SubmitWork of SubmitWork
| SubmitExpectedResultCount of SubmitExpectedResultCount
| SubmitResult of SubmitResult
| SystemMessage of SystemMessage
| Unknown of obj

let (|IsMessage|_|) (msg: obj) =
match msg with
| :? RequestWork as m -> Some(RequestWork m)
| :? SubmitWork as m -> Some(SubmitWork m)
| :? SubmitResult as m -> Some(SubmitResult m)
| :? SubmitExpectedResultCount as m -> Some(SubmitExpectedResultCount m)
| _ -> None

let mapMsg (msg: obj) =
match msg with
| IsMessage m -> m
| IsSystemMessage m -> SystemMessage m
| _ -> Unknown msg
40 changes: 40 additions & 0 deletions examples/example09masterworker/app/Program.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Learn more about F# at http://fsharp.org

open System
open Proto.FSharp
open Proto.Mailbox
open Proto.FSharp.Core
open Proto.Remote
open ProtoActorDemo.Messages

let testLocal() =
let masterPid = Master.createMaster 100

Worker.createWorkerMonitor 5 (Worker.requestWork masterPid) |> ignore


[<EntryPoint>]
let main argv =
printfn "Registrating protos"
Serialization.RegisterFileDescriptor(ProtoActorDemo.Messages.MessagesReflection.Descriptor)

let argList = argv |> Array.toList

match argList with
| [] | "hello"::_ -> HelloProtoActor.hello()
| "supervision"::_ | "s"::_ -> Supervision.run()
| "local"::_ | "l" :: _ -> testLocal()
| "m"::count::_ -> Master.startMaster (Int32.Parse count) |> ignore
| "w"::count::_ -> Worker.startWorker (Int32.Parse count) |> ignore
| _ -> printfn "Unkown argument: %A" argList

// let masterPid = Master.createMaster 30
// let workerPids = [1 .. 10] |> List.map (fun i -> Worker.createWorker masterPid i)

// printfn "Hello World from F#!"
System.Threading.Thread.Sleep Int32.MaxValue
0 // return an integer exit code



// Parent node....
30 changes: 30 additions & 0 deletions examples/example09masterworker/app/Supervision.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module Supervision
open Proto.FSharp
open Proto

let run() =
let rand = new System.Random()

let createChild childId =
let handler _ =
let rec inner cnt =
if cnt = 5 then
childId
|> sprintf "Child %d: I don't want to play anymore"
|> exn
|> raise
else
printfn "Child %d: Playing with you %d" childId cnt
System.Threading.Thread.Sleep(rand.Next(2000))
inner (cnt+1)
inner 0

Actor.create handler |> Actor.initProps

let masterHandler (context:IContext) (msg:obj) =
match msg with
| :? Started ->
[1 .. 2] |> List.iter (createChild >> context.Spawn >> ignore)
| x -> printfn "Master: %A" x

Actor.create2 masterHandler |> Actor.initProps |> Actor.spawn |> ignore
64 changes: 64 additions & 0 deletions examples/example09masterworker/app/Worker.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
module Worker
open MessageHelpers
open Proto.FSharp
open Proto
open System
open Proto.Mailbox
open ProtoActorDemo.Messages

// module Master =
// let sinkPid = Sink.createSink()

let handler workerId requestWork (context: Proto.IContext) message =
match message |> MessageHelpers.mapMsg with
| SubmitWork m ->
printfn "%d Got some work: %A" workerId m
System.Threading.Thread.Sleep 1000
let result = m.Data |> float |> Math.Sqrt |> float32
let resultDto = MessageHelpers.newSubmitResult result
resultDto >! (m.Pid |> MessageHelpers.toProtoPid)
requestWork (context.Self)
| SystemMessage (SystemMessage.Started _) ->
requestWork (context.Self)

let createWorkerKind requestWork workerId =
Actor.create2 (handler workerId requestWork) |> Actor.initProps


let createWorkerMonitor workerCount requestWork =
let handler (context: Proto.IContext) message =
match message |> MessageHelpers.mapMsg with
| SystemMessage (SystemMessage.Started _) ->
[ 1 .. workerCount ] |> List.iter (fun i -> i |> createWorkerKind requestWork |> context.Spawn |> ignore)
| _ -> ()

Actor.create2 handler
|> Actor.initProps
|> Actor.Spawn

let requestWork masterPid workerPid =
MessageHelpers.newRequestWork workerPid >! masterPid

let getMasterPid() =
let rec getPid() =
printfn "Trying to get pid again"
let (pid, sc) = Proto.Cluster.Cluster.GetAsync("FiresideChatCluster", "MasterKind").Result.ToTuple()
if sc <> Proto.Remote.ResponseStatusCode.OK then
printfn "Failed to get pid: %A" (pid, sc)
System.Threading.Thread.Sleep(4000)
getPid()
else pid
getPid()

let startWorker workerCount =
let hostName =
match Environment.GetEnvironmentVariable("CUMPUTERNAME"), Environment.GetEnvironmentVariable("HOSTNAME") with
| null, h -> h
| h, _ -> h
printfn "Hostname: %A" hostName

for entry in Environment.GetEnvironmentVariables() |> Seq.cast<System.Collections.DictionaryEntry> do printfn "%A: %A" entry.Key entry.Value

Cluster.startCluster (hostName) // + (System.Guid.NewGuid().ToString()))
let masterPid = getMasterPid()
createWorkerMonitor workerCount (requestWork masterPid)
24 changes: 24 additions & 0 deletions examples/example09masterworker/app/app.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="../../../src/Proto.Actor.FSharp/Proto.Actor.FSharp.fsproj">
<Name>Proto.Actor.FSharp.fsproj</Name>
</ProjectReference>
<ProjectReference Include="../messages/messages.csproj">
<Name>messages.csproj</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<Compile Include="Cluster.fs" />
<Compile Include="Supervision.fs" />
<Compile Include="MessageHelpers.fs" />
<Compile Include="HelloProtoActor.fs" />
<Compile Include="Worker.fs" />
<Compile Include="Master.fs" />
<Compile Include="Program.fs" />
</ItemGroup>
<Import Project="..\..\..\.paket\Paket.Restore.targets" />
</Project>
2 changes: 2 additions & 0 deletions examples/example09masterworker/app/paket.references
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Proto.Cluster.Consul
FSharp.Core
5 changes: 5 additions & 0 deletions examples/example09masterworker/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

dotnet publish -c Release app
#dotnet publish -c Release Node2
docker-compose up --build --scale worker=3
4 changes: 4 additions & 0 deletions examples/example09masterworker/buildAndRunDocker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

./build.sh
./run_docker.sh
10 changes: 10 additions & 0 deletions examples/example09masterworker/build_kube.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

dotnet publish -c Release app
#dotnet publish -c Release Node2

docker build -t gcr.io/${PROJECT_ID}/fireproto:v1 app/.
##docker build -t gcr.io/${PROJECT_ID}/protodemonode2:v1 Node2/.

#gcloud docker -- push gcr.io/${PROJECT_ID}/f:v1
gcloud docker -- push gcr.io/${PROJECT_ID}/fireproto:v1
23 changes: 23 additions & 0 deletions examples/example09masterworker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: '3'
services:
master:
build: ./app
container_name: "master"
hostname: "master"
depends_on:
- consul
command: "m 200"
worker:
build: ./app
depends_on:
- master
command: "w 1"
consul:
image: "consul:latest"
container_name: "consul"
hostname: "consul"
ports:
- "8400:8400"
- "8500:8500"
- "8600:53"
command: "agent -server -bootstrap -ui -disable-host-node-id -client 0.0.0.0"
Loading