diff --git a/examples/example09masterworker/app/Cluster.fs b/examples/example09masterworker/app/Cluster.fs new file mode 100644 index 0000000..b89654d --- /dev/null +++ b/examples/example09masterworker/app/Cluster.fs @@ -0,0 +1,12 @@ +module Cluster + +open Proto.Cluster +open Proto.Cluster.Consul +open System + +let startCluster nodeName = + let consulConfigure = Action(fun c -> c.Address <- Uri("http://consul:8500/")) + printfn "Connecting to cluster" + Cluster.Start("FiresideChatCluster", nodeName, 12000, ConsulProvider(ConsulProviderOptions(), consulConfigure)) + printfn "Connected to cluster" + diff --git a/examples/Dockerfile b/examples/example09masterworker/app/Dockerfile similarity index 53% rename from examples/Dockerfile rename to examples/example09masterworker/app/Dockerfile index a56c8c2..adb209d 100644 --- a/examples/Dockerfile +++ b/examples/example09masterworker/app/Dockerfile @@ -2,5 +2,6 @@ FROM gcr.io/google-appengine/aspnetcore:2.0 ADD ./bin/Release/netcoreapp2.0/publish/ /app WORKDIR /app - -ENTRYPOINT [ "dotnet", "Node1.dll" ] +EXPOSE 12000 +ENTRYPOINT [ "dotnet", "app.dll" ] +#CMD [ "dotnet", "app.dll", "m", "100" ] diff --git a/examples/example09masterworker/app/HelloProtoActor.fs b/examples/example09masterworker/app/HelloProtoActor.fs new file mode 100644 index 0000000..492e7ee --- /dev/null +++ b/examples/example09masterworker/app/HelloProtoActor.fs @@ -0,0 +1,15 @@ +module HelloProtoActor +open Proto.FSharp + +let hello() = + let pid1 = + Actor.create (printfn "Hello from actor1: %A") + |> Actor.initProps + |> Actor.spawn + let pid2 = + Actor.create (fun x -> printfn "Hello from actor2: %A" x; (sprintf "I was called with %A" x) >! pid1) + |> Actor.initProps + |> Actor.spawn + + [ 1 .. 100 ] + |> List.iter (tell pid2) diff --git a/examples/example09masterworker/app/Master.fs b/examples/example09masterworker/app/Master.fs new file mode 100644 index 0000000..bf4f5fb --- /dev/null +++ b/examples/example09masterworker/app/Master.fs @@ -0,0 +1,69 @@ +module Master +open MessageHelpers +open Proto.FSharp +open Proto.Remote + +module Sink = + type SinkState = { + ExpectedNumberOfResult: int option + NumberOfResult: int + TotalSum: float32 + } + + let sinkHandler message state = + let state' = + match message with + | SubmitExpectedResultCount m -> {state with ExpectedNumberOfResult = Some (m.Count)} + | SubmitResult m -> {state with TotalSum = state.TotalSum + m.Result; NumberOfResult = state.NumberOfResult + 1} + | _ -> state + match state'.ExpectedNumberOfResult with + | Some x when x = state'.NumberOfResult -> + printfn "Average is: %A" (state'.TotalSum / (float32)state'.NumberOfResult) + | None -> + printfn "Current sink state: %A" (state', message) + | Some _ -> () + state' + + let createSink() = + Actor.withState (mapMsg >> sinkHandler) {ExpectedNumberOfResult = None; NumberOfResult = 0; TotalSum = (float32)0.} + |> Actor.initProps + |> Actor.spawn + +type MasterState = { + RemainingWork: int list + SinkPid: Proto.PID option +} + +let setupSink expectedWorkLoad = + let sinkPid = Sink.createSink() + let expectedCountMsg = MessageHelpers.newSubmitExpectedResultCount expectedWorkLoad + expectedCountMsg >! sinkPid + sinkPid + +let handler (context: Proto.IContext) message state = + match message |> MessageHelpers.mapMsg with + | SystemMessage (SystemMessage.Started _) -> + printfn "Starting master" + let sinkPid = setupSink (state.RemainingWork.Length) + printfn "Sink started" + {state with SinkPid = Some sinkPid} + | RequestWork m -> + match state.RemainingWork with + | [] -> state + | x::rest -> + let sinkPid = state.SinkPid |> Option.get + let protoPid = m.Pid |> MessageHelpers.toProtoPid + newSubmitWork sinkPid x >! protoPid + {state with RemainingWork = rest} + +let createMasterProps numberOfWork = + Actor.withState2 handler {RemainingWork = [1 .. numberOfWork]; SinkPid = None} + |> Actor.initProps + +let createMaster = createMasterProps >> (Actor.spawn) + +let startMaster numberOfWork = + let props = createMasterProps numberOfWork + Remote.RegisterKnownKind("MasterKind", props) + createMaster numberOfWork |> ignore + Cluster.startCluster "master" diff --git a/examples/example09masterworker/app/MessageHelpers.fs b/examples/example09masterworker/app/MessageHelpers.fs new file mode 100644 index 0000000..c19fd48 --- /dev/null +++ b/examples/example09masterworker/app/MessageHelpers.fs @@ -0,0 +1,55 @@ +module MessageHelpers +open ProtoActorDemo.Messages +open Proto.FSharp + +let toMessagePid (pid: Proto.PID) = + let messagePid = PID() + messagePid.Address <- pid.Address + messagePid.Id <- pid.Id + messagePid + +let toProtoPid (pid: PID) = + Proto.PID(pid.Address, pid.Id) + +let newRequestWork (pid: Proto.PID) = + let requestWork = RequestWork() + requestWork.Pid <- toMessagePid pid + requestWork + +let newSubmitWork (pid: Proto.PID) data = + let submitWork = SubmitWork() + submitWork.Data <- data + submitWork.Pid <- toMessagePid pid + submitWork + +let newSubmitResult result = + let submitResult = SubmitResult() + submitResult.Result <- result + submitResult + +let newSubmitExpectedResultCount count = + let submitExpectedResultCount = new SubmitExpectedResultCount() + submitExpectedResultCount.Count <- count + submitExpectedResultCount + +type Message = + | RequestWork of RequestWork + | SubmitWork of SubmitWork + | SubmitExpectedResultCount of SubmitExpectedResultCount + | SubmitResult of SubmitResult + | SystemMessage of SystemMessage + | Unknown of obj + +let (|IsMessage|_|) (msg: obj) = + match msg with + | :? RequestWork as m -> Some(RequestWork m) + | :? SubmitWork as m -> Some(SubmitWork m) + | :? SubmitResult as m -> Some(SubmitResult m) + | :? SubmitExpectedResultCount as m -> Some(SubmitExpectedResultCount m) + | _ -> None + +let mapMsg (msg: obj) = + match msg with + | IsMessage m -> m + | IsSystemMessage m -> SystemMessage m + | _ -> Unknown msg diff --git a/examples/example09masterworker/app/Program.fs b/examples/example09masterworker/app/Program.fs new file mode 100644 index 0000000..e02997d --- /dev/null +++ b/examples/example09masterworker/app/Program.fs @@ -0,0 +1,40 @@ +// Learn more about F# at http://fsharp.org + +open System +open Proto.FSharp +open Proto.Mailbox +open Proto.FSharp.Core +open Proto.Remote +open ProtoActorDemo.Messages + +let testLocal() = + let masterPid = Master.createMaster 100 + + Worker.createWorkerMonitor 5 (Worker.requestWork masterPid) |> ignore + + +[] +let main argv = + printfn "Registrating protos" + Serialization.RegisterFileDescriptor(ProtoActorDemo.Messages.MessagesReflection.Descriptor) + + let argList = argv |> Array.toList + + match argList with + | [] | "hello"::_ -> HelloProtoActor.hello() + | "supervision"::_ | "s"::_ -> Supervision.run() + | "local"::_ | "l" :: _ -> testLocal() + | "m"::count::_ -> Master.startMaster (Int32.Parse count) |> ignore + | "w"::count::_ -> Worker.startWorker (Int32.Parse count) |> ignore + | _ -> printfn "Unkown argument: %A" argList + + // let masterPid = Master.createMaster 30 + // let workerPids = [1 .. 10] |> List.map (fun i -> Worker.createWorker masterPid i) + + // printfn "Hello World from F#!" + System.Threading.Thread.Sleep Int32.MaxValue + 0 // return an integer exit code + + + +// Parent node.... \ No newline at end of file diff --git a/examples/example09masterworker/app/Supervision.fs b/examples/example09masterworker/app/Supervision.fs new file mode 100644 index 0000000..992e93c --- /dev/null +++ b/examples/example09masterworker/app/Supervision.fs @@ -0,0 +1,30 @@ +module Supervision +open Proto.FSharp +open Proto + +let run() = + let rand = new System.Random() + + let createChild childId = + let handler _ = + let rec inner cnt = + if cnt = 5 then + childId + |> sprintf "Child %d: I don't want to play anymore" + |> exn + |> raise + else + printfn "Child %d: Playing with you %d" childId cnt + System.Threading.Thread.Sleep(rand.Next(2000)) + inner (cnt+1) + inner 0 + + Actor.create handler |> Actor.initProps + + let masterHandler (context:IContext) (msg:obj) = + match msg with + | :? Started -> + [1 .. 2] |> List.iter (createChild >> context.Spawn >> ignore) + | x -> printfn "Master: %A" x + + Actor.create2 masterHandler |> Actor.initProps |> Actor.spawn |> ignore \ No newline at end of file diff --git a/examples/example09masterworker/app/Worker.fs b/examples/example09masterworker/app/Worker.fs new file mode 100644 index 0000000..9184074 --- /dev/null +++ b/examples/example09masterworker/app/Worker.fs @@ -0,0 +1,64 @@ +module Worker +open MessageHelpers +open Proto.FSharp +open Proto +open System +open Proto.Mailbox +open ProtoActorDemo.Messages + +// module Master = +// let sinkPid = Sink.createSink() + +let handler workerId requestWork (context: Proto.IContext) message = + match message |> MessageHelpers.mapMsg with + | SubmitWork m -> + printfn "%d Got some work: %A" workerId m + System.Threading.Thread.Sleep 1000 + let result = m.Data |> float |> Math.Sqrt |> float32 + let resultDto = MessageHelpers.newSubmitResult result + resultDto >! (m.Pid |> MessageHelpers.toProtoPid) + requestWork (context.Self) + | SystemMessage (SystemMessage.Started _) -> + requestWork (context.Self) + +let createWorkerKind requestWork workerId = + Actor.create2 (handler workerId requestWork) |> Actor.initProps + + +let createWorkerMonitor workerCount requestWork = + let handler (context: Proto.IContext) message = + match message |> MessageHelpers.mapMsg with + | SystemMessage (SystemMessage.Started _) -> + [ 1 .. workerCount ] |> List.iter (fun i -> i |> createWorkerKind requestWork |> context.Spawn |> ignore) + | _ -> () + + Actor.create2 handler + |> Actor.initProps + |> Actor.Spawn + +let requestWork masterPid workerPid = + MessageHelpers.newRequestWork workerPid >! masterPid + +let getMasterPid() = + let rec getPid() = + printfn "Trying to get pid again" + let (pid, sc) = Proto.Cluster.Cluster.GetAsync("FiresideChatCluster", "MasterKind").Result.ToTuple() + if sc <> Proto.Remote.ResponseStatusCode.OK then + printfn "Failed to get pid: %A" (pid, sc) + System.Threading.Thread.Sleep(4000) + getPid() + else pid + getPid() + +let startWorker workerCount = + let hostName = + match Environment.GetEnvironmentVariable("CUMPUTERNAME"), Environment.GetEnvironmentVariable("HOSTNAME") with + | null, h -> h + | h, _ -> h + printfn "Hostname: %A" hostName + + for entry in Environment.GetEnvironmentVariables() |> Seq.cast do printfn "%A: %A" entry.Key entry.Value + + Cluster.startCluster (hostName) // + (System.Guid.NewGuid().ToString())) + let masterPid = getMasterPid() + createWorkerMonitor workerCount (requestWork masterPid) diff --git a/examples/example09masterworker/app/app.fsproj b/examples/example09masterworker/app/app.fsproj new file mode 100644 index 0000000..7aa6619 --- /dev/null +++ b/examples/example09masterworker/app/app.fsproj @@ -0,0 +1,24 @@ + + + Exe + netcoreapp2.0 + + + + Proto.Actor.FSharp.fsproj + + + messages.csproj + + + + + + + + + + + + + \ No newline at end of file diff --git a/examples/example09masterworker/app/paket.references b/examples/example09masterworker/app/paket.references new file mode 100644 index 0000000..4a10aca --- /dev/null +++ b/examples/example09masterworker/app/paket.references @@ -0,0 +1,2 @@ +Proto.Cluster.Consul +FSharp.Core \ No newline at end of file diff --git a/examples/example09masterworker/build.sh b/examples/example09masterworker/build.sh new file mode 100755 index 0000000..a528df1 --- /dev/null +++ b/examples/example09masterworker/build.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +dotnet publish -c Release app +#dotnet publish -c Release Node2 +docker-compose up --build --scale worker=3 diff --git a/examples/example09masterworker/buildAndRunDocker.sh b/examples/example09masterworker/buildAndRunDocker.sh new file mode 100755 index 0000000..1b2dfab --- /dev/null +++ b/examples/example09masterworker/buildAndRunDocker.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +./build.sh +./run_docker.sh \ No newline at end of file diff --git a/examples/example09masterworker/build_kube.sh b/examples/example09masterworker/build_kube.sh new file mode 100755 index 0000000..83638d6 --- /dev/null +++ b/examples/example09masterworker/build_kube.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +dotnet publish -c Release app +#dotnet publish -c Release Node2 + +docker build -t gcr.io/${PROJECT_ID}/fireproto:v1 app/. +##docker build -t gcr.io/${PROJECT_ID}/protodemonode2:v1 Node2/. + +#gcloud docker -- push gcr.io/${PROJECT_ID}/f:v1 +gcloud docker -- push gcr.io/${PROJECT_ID}/fireproto:v1 \ No newline at end of file diff --git a/examples/example09masterworker/docker-compose.yml b/examples/example09masterworker/docker-compose.yml new file mode 100644 index 0000000..7abd398 --- /dev/null +++ b/examples/example09masterworker/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3' +services: + master: + build: ./app + container_name: "master" + hostname: "master" + depends_on: + - consul + command: "m 200" + worker: + build: ./app + depends_on: + - master + command: "w 1" + consul: + image: "consul:latest" + container_name: "consul" + hostname: "consul" + ports: + - "8400:8400" + - "8500:8500" + - "8600:53" + command: "agent -server -bootstrap -ui -disable-host-node-id -client 0.0.0.0" \ No newline at end of file diff --git a/examples/example09masterworker/messages/Messages.g.cs b/examples/example09masterworker/messages/Messages.g.cs new file mode 100644 index 0000000..af1fc6b --- /dev/null +++ b/examples/example09masterworker/messages/Messages.g.cs @@ -0,0 +1,731 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: messages.proto +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +namespace ProtoActorDemo.Messages { + + /// Holder for reflection information generated from messages.proto + public static partial class MessagesReflection { + + #region Descriptor + /// File descriptor for messages.proto + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static MessagesReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "Cg5tZXNzYWdlcy5wcm90bxIIbWVzc2FnZXMiIgoDUElEEg8KB0FkZHJlc3MY", + "ASABKAkSCgoCSWQYAiABKAkiKQoLUmVxdWVzdFdvcmsSGgoDcGlkGAEgASgL", + "Mg0ubWVzc2FnZXMuUElEIjYKClN1Ym1pdFdvcmsSGgoDcGlkGAEgASgLMg0u", + "bWVzc2FnZXMuUElEEgwKBGRhdGEYAyABKAUiKgoZU3VibWl0RXhwZWN0ZWRS", + "ZXN1bHRDb3VudBINCgVjb3VudBgBIAEoBSIsCgxTdWJtaXRSZXN1bHQSDAoE", + "ZGF0YRgDIAEoBRIOCgZyZXN1bHQYBCABKAJCGqoCF1Byb3RvQWN0b3JEZW1v", + "Lk1lc3NhZ2VzYgZwcm90bzM=")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.PID), global::ProtoActorDemo.Messages.PID.Parser, new[]{ "Address", "Id" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.RequestWork), global::ProtoActorDemo.Messages.RequestWork.Parser, new[]{ "Pid" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitWork), global::ProtoActorDemo.Messages.SubmitWork.Parser, new[]{ "Pid", "Data" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitExpectedResultCount), global::ProtoActorDemo.Messages.SubmitExpectedResultCount.Parser, new[]{ "Count" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitResult), global::ProtoActorDemo.Messages.SubmitResult.Parser, new[]{ "Data", "Result" }, null, null, null) + })); + } + #endregion + + } + #region Messages + public sealed partial class PID : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new PID()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[0]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PID() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PID(PID other) : this() { + address_ = other.address_; + id_ = other.id_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PID Clone() { + return new PID(this); + } + + /// Field number for the "Address" field. + public const int AddressFieldNumber = 1; + private string address_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string Address { + get { return address_; } + set { + address_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + /// Field number for the "Id" field. + public const int IdFieldNumber = 2; + private string id_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string Id { + get { return id_; } + set { + id_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as PID); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(PID other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Address != other.Address) return false; + if (Id != other.Id) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Address.Length != 0) hash ^= Address.GetHashCode(); + if (Id.Length != 0) hash ^= Id.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Address.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Address); + } + if (Id.Length != 0) { + output.WriteRawTag(18); + output.WriteString(Id); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Address.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Address); + } + if (Id.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Id); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(PID other) { + if (other == null) { + return; + } + if (other.Address.Length != 0) { + Address = other.Address; + } + if (other.Id.Length != 0) { + Id = other.Id; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + Address = input.ReadString(); + break; + } + case 18: { + Id = input.ReadString(); + break; + } + } + } + } + + } + + public sealed partial class RequestWork : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new RequestWork()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[1]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RequestWork() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RequestWork(RequestWork other) : this() { + Pid = other.pid_ != null ? other.Pid.Clone() : null; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RequestWork Clone() { + return new RequestWork(this); + } + + /// Field number for the "pid" field. + public const int PidFieldNumber = 1; + private global::ProtoActorDemo.Messages.PID pid_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public global::ProtoActorDemo.Messages.PID Pid { + get { return pid_; } + set { + pid_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as RequestWork); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(RequestWork other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(Pid, other.Pid)) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (pid_ != null) hash ^= Pid.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (pid_ != null) { + output.WriteRawTag(10); + output.WriteMessage(Pid); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (pid_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(Pid); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(RequestWork other) { + if (other == null) { + return; + } + if (other.pid_ != null) { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + Pid.MergeFrom(other.Pid); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + input.ReadMessage(pid_); + break; + } + } + } + } + + } + + public sealed partial class SubmitWork : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitWork()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[2]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitWork() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitWork(SubmitWork other) : this() { + Pid = other.pid_ != null ? other.Pid.Clone() : null; + data_ = other.data_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitWork Clone() { + return new SubmitWork(this); + } + + /// Field number for the "pid" field. + public const int PidFieldNumber = 1; + private global::ProtoActorDemo.Messages.PID pid_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public global::ProtoActorDemo.Messages.PID Pid { + get { return pid_; } + set { + pid_ = value; + } + } + + /// Field number for the "data" field. + public const int DataFieldNumber = 3; + private int data_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Data { + get { return data_; } + set { + data_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as SubmitWork); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(SubmitWork other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(Pid, other.Pid)) return false; + if (Data != other.Data) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (pid_ != null) hash ^= Pid.GetHashCode(); + if (Data != 0) hash ^= Data.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (pid_ != null) { + output.WriteRawTag(10); + output.WriteMessage(Pid); + } + if (Data != 0) { + output.WriteRawTag(24); + output.WriteInt32(Data); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (pid_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(Pid); + } + if (Data != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Data); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(SubmitWork other) { + if (other == null) { + return; + } + if (other.pid_ != null) { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + Pid.MergeFrom(other.Pid); + } + if (other.Data != 0) { + Data = other.Data; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + input.ReadMessage(pid_); + break; + } + case 24: { + Data = input.ReadInt32(); + break; + } + } + } + } + + } + + public sealed partial class SubmitExpectedResultCount : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitExpectedResultCount()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[3]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitExpectedResultCount() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitExpectedResultCount(SubmitExpectedResultCount other) : this() { + count_ = other.count_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitExpectedResultCount Clone() { + return new SubmitExpectedResultCount(this); + } + + /// Field number for the "count" field. + public const int CountFieldNumber = 1; + private int count_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Count { + get { return count_; } + set { + count_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as SubmitExpectedResultCount); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(SubmitExpectedResultCount other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Count != other.Count) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Count != 0) hash ^= Count.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Count != 0) { + output.WriteRawTag(8); + output.WriteInt32(Count); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Count != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Count); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(SubmitExpectedResultCount other) { + if (other == null) { + return; + } + if (other.Count != 0) { + Count = other.Count; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 8: { + Count = input.ReadInt32(); + break; + } + } + } + } + + } + + public sealed partial class SubmitResult : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitResult()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[4]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitResult() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitResult(SubmitResult other) : this() { + data_ = other.data_; + result_ = other.result_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitResult Clone() { + return new SubmitResult(this); + } + + /// Field number for the "data" field. + public const int DataFieldNumber = 3; + private int data_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Data { + get { return data_; } + set { + data_ = value; + } + } + + /// Field number for the "result" field. + public const int ResultFieldNumber = 4; + private float result_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public float Result { + get { return result_; } + set { + result_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as SubmitResult); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(SubmitResult other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Data != other.Data) return false; + if (Result != other.Result) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Data != 0) hash ^= Data.GetHashCode(); + if (Result != 0F) hash ^= Result.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Data != 0) { + output.WriteRawTag(24); + output.WriteInt32(Data); + } + if (Result != 0F) { + output.WriteRawTag(37); + output.WriteFloat(Result); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Data != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Data); + } + if (Result != 0F) { + size += 1 + 4; + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(SubmitResult other) { + if (other == null) { + return; + } + if (other.Data != 0) { + Data = other.Data; + } + if (other.Result != 0F) { + Result = other.Result; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 24: { + Data = input.ReadInt32(); + break; + } + case 37: { + Result = input.ReadFloat(); + break; + } + } + } + } + + } + + #endregion + +} + +#endregion Designer generated code diff --git a/examples/example09masterworker/messages/build.sh b/examples/example09masterworker/messages/build.sh new file mode 100755 index 0000000..317eca3 --- /dev/null +++ b/examples/example09masterworker/messages/build.sh @@ -0,0 +1,2 @@ +#!/bin/bash +protoc messages.proto -I=. --csharp_out=. --csharp_opt=file_extension=.g.cs --grpc_out . --plugin=protoc-gen-grpc=/Users/tomasjansson/.nuget/packages/grpc.tools/1.6.1/tools/macosx_x64/grpc_csharp_plugin diff --git a/examples/example09masterworker/messages/messages.csproj b/examples/example09masterworker/messages/messages.csproj new file mode 100644 index 0000000..a203db6 --- /dev/null +++ b/examples/example09masterworker/messages/messages.csproj @@ -0,0 +1,9 @@ + + + netstandard2.0 + + + + + + diff --git a/examples/example09masterworker/messages/messages.proto b/examples/example09masterworker/messages/messages.proto new file mode 100644 index 0000000..7c8053d --- /dev/null +++ b/examples/example09masterworker/messages/messages.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; +package messages; + +option csharp_namespace = "ProtoActorDemo.Messages"; + +message PID { + string Address = 1; + string Id = 2; +} + +message RequestWork { + PID pid = 1; +} + +message SubmitWork { + PID pid = 1; + int32 data = 3; +} + +message SubmitExpectedResultCount { + int32 count = 1; +} + +message SubmitResult { + int32 data = 3; + float result = 4; +} \ No newline at end of file diff --git a/examples/example09masterworker/run_docker.sh b/examples/example09masterworker/run_docker.sh new file mode 100755 index 0000000..fc521fd --- /dev/null +++ b/examples/example09masterworker/run_docker.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker-compose up --build --scale worker=1