From d2d65a792bd462465059f4f4cb3ddef44c6e7219 Mon Sep 17 00:00:00 2001 From: Tomas Jansson Date: Sun, 21 Jan 2018 21:29:26 +0100 Subject: [PATCH 1/3] start of master worker example --- examples/example09masterworker/app/Program.fs | 147 ++++ examples/example09masterworker/app/app.fsproj | 17 + .../messages/Messages.g.cs | 676 ++++++++++++++++++ .../example09masterworker/messages/build.sh | 2 + .../messages/messages.csproj | 9 + .../messages/messages.proto | 25 + 6 files changed, 876 insertions(+) create mode 100644 examples/example09masterworker/app/Program.fs create mode 100644 examples/example09masterworker/app/app.fsproj create mode 100644 examples/example09masterworker/messages/Messages.g.cs create mode 100755 examples/example09masterworker/messages/build.sh create mode 100644 examples/example09masterworker/messages/messages.csproj create mode 100644 examples/example09masterworker/messages/messages.proto diff --git a/examples/example09masterworker/app/Program.fs b/examples/example09masterworker/app/Program.fs new file mode 100644 index 0000000..8156a43 --- /dev/null +++ b/examples/example09masterworker/app/Program.fs @@ -0,0 +1,147 @@ +// Learn more about F# at http://fsharp.org + +open System +open Proto.FSharp +open ProtoActorDemo.Messages +open Proto.Mailbox +open Proto.FSharp.Core + +module MessageHelpers = + + let toMessagePid (pid: Proto.PID) = + let messagePid = PID() + messagePid.Address <- pid.Address + messagePid.Id <- pid.Id + messagePid + + let toProtoPid (pid: PID) = + Proto.PID(pid.Address, pid.Id) + + let newRequestWork (pid: Proto.PID) = + let requestWork = RequestWork() + requestWork.Pid <- toMessagePid pid + requestWork + + let newSubmitWorkRequest (pid: Proto.PID) (correlationId: Guid) data = + let submitWorkRequest = SubmitWorkRequest() + submitWorkRequest.CorrelationId <- correlationId.ToString() + submitWorkRequest.Data <- data + submitWorkRequest.Pid <- toMessagePid pid + submitWorkRequest + + let newFinishedWorkResponse (pid: Proto.PID) (correlationId: Guid) result = + let finishedWorkResponse = FinishedWorkResponse() + finishedWorkResponse.CorrelationId <- correlationId.ToString() + finishedWorkResponse.Result <- result + finishedWorkResponse.Pid <- toMessagePid pid + finishedWorkResponse + + type Messages = + | RequestWork of RequestWork + | SubmitWorkRequest of SubmitWorkRequest + | FinishedWorkResponse of FinishedWorkResponse + | SystemMessage of SystemMessage + | Unknown of obj + + let (|IsMessage|_|) (msg: obj) = + match msg with + | :? RequestWork as m -> Some(RequestWork m) + | :? SubmitWorkRequest as m -> Some(SubmitWorkRequest m) + | :? FinishedWorkResponse as m -> Some(FinishedWorkResponse m) + | _ -> None + + let mapMsg (msg: obj) = + match msg with + | IsMessage m -> m + | IsSystemMessage m -> SystemMessage m + | _ -> Unknown msg + +module Master = + open MessageHelpers + + type MasterState = { + JobQueue: int list + SubmittedJob: Map + FinishedJobs: Map + StartTime: DateTime + Sink: PID option + } + + let handler (context:Proto.IContext) message state = + let submitJob (pid: ProtoActorDemo.Messages.PID) state = + match state.JobQueue with + | x::rest -> + let correlationId = Guid.NewGuid() + let request = newSubmitWorkRequest (state.Sink |> Option.get |> toProtoPid) correlationId x + request >! (toProtoPid pid) + {state with JobQueue = rest; SubmittedJob = state.SubmittedJob |> Map.add correlationId x} + | [] -> state + + match mapMsg message with + | RequestWork m -> submitJob (m.Pid) state + | FinishedWorkResponse m -> + let key = Guid.Parse(m.CorrelationId) + if state.SubmittedJob |> Map.tryFind key |> Option.isSome + then + let state' = {state with FinishedJobs = state.FinishedJobs |> Map.add key m.Result} + if state'.FinishedJobs.Count = state'.SubmittedJob.Count && state.JobQueue |> List.isEmpty + then + let result = state'.FinishedJobs |> Map.toSeq |> Seq.sumBy snd + printfn "JobQueue finished with total result: %A" result + printfn "Duration %A" (DateTime.Now - state.StartTime) + state' + else + state' |> submitJob m.Pid + else state + | SystemMessage (SystemMessage.Started _) -> + printfn "YOLO" + let sinkProtoPid = Actor.create (printfn "Sink: %A") |> Actor.initProps |> Actor.spawn + let sinkPid = PID() + printfn "YOLO 2" + sinkPid.Address <- sinkProtoPid.Address + sinkPid.Id <- sinkProtoPid.Id + {state with Sink = Some sinkPid } + | _ -> state + + + let createMaster workLoad = + printfn "Creating master" + Actor.withState2 handler ({JobQueue = (List.init workLoad id); SubmittedJob = Map.empty; FinishedJobs = Map.empty; StartTime = DateTime.Now; Sink = None}) + |> Actor.initProps + |> Actor.spawn + +module Worker = + open MessageHelpers + let createWorker (masterPid: Proto.PID) workerId = + let handler (context: Proto.IContext) message = + match mapMsg message with + | SubmitWorkRequest m -> + printfn "%d Got some work: %A" workerId m.Data + System.Threading.Thread.Sleep 1000 + let result = (m.Data |> float |> Math.Sqrt |> int) + newFinishedWorkResponse context.Self (Guid.Parse(m.CorrelationId)) result + >! (toProtoPid m.Pid) + newRequestWork (context.Self) >! masterPid + | SystemMessage (SystemMessage.Started _) -> + printfn "Worker started: %A" workerId + newRequestWork (context.Self) >! masterPid + | _ -> printfn "Should never happen" + + Actor.create2 handler + |> Actor.initProps + |> Actor.spawn + +open MessageHelpers +[] +let main argv = + + let masterPid = Master.createMaster 20 + let workerPids = [1 .. 10] |> List.map (fun i -> Worker.createWorker masterPid i) + + printfn "Hello World from F#!" + System.Threading.Thread.Sleep Int32.MaxValue + 0 // return an integer exit code + + + +// Parent node.... \ No newline at end of file diff --git a/examples/example09masterworker/app/app.fsproj b/examples/example09masterworker/app/app.fsproj new file mode 100644 index 0000000..9aa683f --- /dev/null +++ b/examples/example09masterworker/app/app.fsproj @@ -0,0 +1,17 @@ + + + Exe + netcoreapp2.0 + + + + Proto.Actor.FSharp.fsproj + + + messages.csproj + + + + + + \ No newline at end of file diff --git a/examples/example09masterworker/messages/Messages.g.cs b/examples/example09masterworker/messages/Messages.g.cs new file mode 100644 index 0000000..021d729 --- /dev/null +++ b/examples/example09masterworker/messages/Messages.g.cs @@ -0,0 +1,676 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: messages.proto +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +namespace ProtoActorDemo.Messages { + + /// Holder for reflection information generated from messages.proto + public static partial class MessagesReflection { + + #region Descriptor + /// File descriptor for messages.proto + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static MessagesReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "Cg5tZXNzYWdlcy5wcm90bxIIbWVzc2FnZXMiIgoDUElEEg8KB0FkZHJlc3MY", + "ASABKAkSCgoCSWQYAiABKAkiKQoLUmVxdWVzdFdvcmsSGgoDcGlkGAEgASgL", + "Mg0ubWVzc2FnZXMuUElEIlUKEVN1Ym1pdFdvcmtSZXF1ZXN0EhoKA3BpZBgB", + "IAEoCzINLm1lc3NhZ2VzLlBJRBIWCg5jb3JyZWxhdGlvbl9pZBgCIAEoCRIM", + "CgRkYXRhGAMgASgFIloKFEZpbmlzaGVkV29ya1Jlc3BvbnNlEhoKA3BpZBgB", + "IAEoCzINLm1lc3NhZ2VzLlBJRBIWCg5jb3JyZWxhdGlvbl9pZBgCIAEoCRIO", + "CgZyZXN1bHQYAyABKAVCGqoCF1Byb3RvQWN0b3JEZW1vLk1lc3NhZ2VzYgZw", + "cm90bzM=")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.PID), global::ProtoActorDemo.Messages.PID.Parser, new[]{ "Address", "Id" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.RequestWork), global::ProtoActorDemo.Messages.RequestWork.Parser, new[]{ "Pid" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitWorkRequest), global::ProtoActorDemo.Messages.SubmitWorkRequest.Parser, new[]{ "Pid", "CorrelationId", "Data" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.FinishedWorkResponse), global::ProtoActorDemo.Messages.FinishedWorkResponse.Parser, new[]{ "Pid", "CorrelationId", "Result" }, null, null, null) + })); + } + #endregion + + } + #region Messages + public sealed partial class PID : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new PID()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[0]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PID() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PID(PID other) : this() { + address_ = other.address_; + id_ = other.id_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public PID Clone() { + return new PID(this); + } + + /// Field number for the "Address" field. + public const int AddressFieldNumber = 1; + private string address_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string Address { + get { return address_; } + set { + address_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + /// Field number for the "Id" field. + public const int IdFieldNumber = 2; + private string id_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string Id { + get { return id_; } + set { + id_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as PID); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(PID other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Address != other.Address) return false; + if (Id != other.Id) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Address.Length != 0) hash ^= Address.GetHashCode(); + if (Id.Length != 0) hash ^= Id.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Address.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Address); + } + if (Id.Length != 0) { + output.WriteRawTag(18); + output.WriteString(Id); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Address.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Address); + } + if (Id.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Id); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(PID other) { + if (other == null) { + return; + } + if (other.Address.Length != 0) { + Address = other.Address; + } + if (other.Id.Length != 0) { + Id = other.Id; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + Address = input.ReadString(); + break; + } + case 18: { + Id = input.ReadString(); + break; + } + } + } + } + + } + + public sealed partial class RequestWork : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new RequestWork()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[1]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RequestWork() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RequestWork(RequestWork other) : this() { + Pid = other.pid_ != null ? other.Pid.Clone() : null; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RequestWork Clone() { + return new RequestWork(this); + } + + /// Field number for the "pid" field. + public const int PidFieldNumber = 1; + private global::ProtoActorDemo.Messages.PID pid_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public global::ProtoActorDemo.Messages.PID Pid { + get { return pid_; } + set { + pid_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as RequestWork); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(RequestWork other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(Pid, other.Pid)) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (pid_ != null) hash ^= Pid.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (pid_ != null) { + output.WriteRawTag(10); + output.WriteMessage(Pid); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (pid_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(Pid); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(RequestWork other) { + if (other == null) { + return; + } + if (other.pid_ != null) { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + Pid.MergeFrom(other.Pid); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + input.ReadMessage(pid_); + break; + } + } + } + } + + } + + public sealed partial class SubmitWorkRequest : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitWorkRequest()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[2]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitWorkRequest() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitWorkRequest(SubmitWorkRequest other) : this() { + Pid = other.pid_ != null ? other.Pid.Clone() : null; + correlationId_ = other.correlationId_; + data_ = other.data_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitWorkRequest Clone() { + return new SubmitWorkRequest(this); + } + + /// Field number for the "pid" field. + public const int PidFieldNumber = 1; + private global::ProtoActorDemo.Messages.PID pid_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public global::ProtoActorDemo.Messages.PID Pid { + get { return pid_; } + set { + pid_ = value; + } + } + + /// Field number for the "correlation_id" field. + public const int CorrelationIdFieldNumber = 2; + private string correlationId_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string CorrelationId { + get { return correlationId_; } + set { + correlationId_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + /// Field number for the "data" field. + public const int DataFieldNumber = 3; + private int data_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Data { + get { return data_; } + set { + data_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as SubmitWorkRequest); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(SubmitWorkRequest other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(Pid, other.Pid)) return false; + if (CorrelationId != other.CorrelationId) return false; + if (Data != other.Data) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (pid_ != null) hash ^= Pid.GetHashCode(); + if (CorrelationId.Length != 0) hash ^= CorrelationId.GetHashCode(); + if (Data != 0) hash ^= Data.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (pid_ != null) { + output.WriteRawTag(10); + output.WriteMessage(Pid); + } + if (CorrelationId.Length != 0) { + output.WriteRawTag(18); + output.WriteString(CorrelationId); + } + if (Data != 0) { + output.WriteRawTag(24); + output.WriteInt32(Data); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (pid_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(Pid); + } + if (CorrelationId.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(CorrelationId); + } + if (Data != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Data); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(SubmitWorkRequest other) { + if (other == null) { + return; + } + if (other.pid_ != null) { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + Pid.MergeFrom(other.Pid); + } + if (other.CorrelationId.Length != 0) { + CorrelationId = other.CorrelationId; + } + if (other.Data != 0) { + Data = other.Data; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + input.ReadMessage(pid_); + break; + } + case 18: { + CorrelationId = input.ReadString(); + break; + } + case 24: { + Data = input.ReadInt32(); + break; + } + } + } + } + + } + + public sealed partial class FinishedWorkResponse : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new FinishedWorkResponse()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[3]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public FinishedWorkResponse() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public FinishedWorkResponse(FinishedWorkResponse other) : this() { + Pid = other.pid_ != null ? other.Pid.Clone() : null; + correlationId_ = other.correlationId_; + result_ = other.result_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public FinishedWorkResponse Clone() { + return new FinishedWorkResponse(this); + } + + /// Field number for the "pid" field. + public const int PidFieldNumber = 1; + private global::ProtoActorDemo.Messages.PID pid_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public global::ProtoActorDemo.Messages.PID Pid { + get { return pid_; } + set { + pid_ = value; + } + } + + /// Field number for the "correlation_id" field. + public const int CorrelationIdFieldNumber = 2; + private string correlationId_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string CorrelationId { + get { return correlationId_; } + set { + correlationId_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + /// Field number for the "result" field. + public const int ResultFieldNumber = 3; + private int result_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Result { + get { return result_; } + set { + result_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as FinishedWorkResponse); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(FinishedWorkResponse other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(Pid, other.Pid)) return false; + if (CorrelationId != other.CorrelationId) return false; + if (Result != other.Result) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (pid_ != null) hash ^= Pid.GetHashCode(); + if (CorrelationId.Length != 0) hash ^= CorrelationId.GetHashCode(); + if (Result != 0) hash ^= Result.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (pid_ != null) { + output.WriteRawTag(10); + output.WriteMessage(Pid); + } + if (CorrelationId.Length != 0) { + output.WriteRawTag(18); + output.WriteString(CorrelationId); + } + if (Result != 0) { + output.WriteRawTag(24); + output.WriteInt32(Result); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (pid_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(Pid); + } + if (CorrelationId.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(CorrelationId); + } + if (Result != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Result); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(FinishedWorkResponse other) { + if (other == null) { + return; + } + if (other.pid_ != null) { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + Pid.MergeFrom(other.Pid); + } + if (other.CorrelationId.Length != 0) { + CorrelationId = other.CorrelationId; + } + if (other.Result != 0) { + Result = other.Result; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + if (pid_ == null) { + pid_ = new global::ProtoActorDemo.Messages.PID(); + } + input.ReadMessage(pid_); + break; + } + case 18: { + CorrelationId = input.ReadString(); + break; + } + case 24: { + Result = input.ReadInt32(); + break; + } + } + } + } + + } + + #endregion + +} + +#endregion Designer generated code diff --git a/examples/example09masterworker/messages/build.sh b/examples/example09masterworker/messages/build.sh new file mode 100755 index 0000000..317eca3 --- /dev/null +++ b/examples/example09masterworker/messages/build.sh @@ -0,0 +1,2 @@ +#!/bin/bash +protoc messages.proto -I=. --csharp_out=. --csharp_opt=file_extension=.g.cs --grpc_out . --plugin=protoc-gen-grpc=/Users/tomasjansson/.nuget/packages/grpc.tools/1.6.1/tools/macosx_x64/grpc_csharp_plugin diff --git a/examples/example09masterworker/messages/messages.csproj b/examples/example09masterworker/messages/messages.csproj new file mode 100644 index 0000000..a203db6 --- /dev/null +++ b/examples/example09masterworker/messages/messages.csproj @@ -0,0 +1,9 @@ + + + netstandard2.0 + + + + + + diff --git a/examples/example09masterworker/messages/messages.proto b/examples/example09masterworker/messages/messages.proto new file mode 100644 index 0000000..ff1e9f3 --- /dev/null +++ b/examples/example09masterworker/messages/messages.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; +package messages; + +option csharp_namespace = "ProtoActorDemo.Messages"; + +message PID { + string Address = 1; + string Id = 2; +} + +message RequestWork { + PID pid = 1; +} + +message SubmitWorkRequest { + PID pid = 1; + string correlation_id = 2; + int32 data = 3; +} + +message FinishedWorkResponse { + PID pid = 1; + string correlation_id = 2; + int32 result = 3; +} \ No newline at end of file From 779c347831849ae9525fce9c01a7c1ed6ed94f04 Mon Sep 17 00:00:00 2001 From: Tomas Jansson Date: Tue, 23 Jan 2018 00:45:05 +0100 Subject: [PATCH 2/3] master worker with docker --- examples/example09masterworker/app/Cluster.fs | 12 + .../app}/Dockerfile | 5 +- .../app/HelloProtoActor.fs | 15 + examples/example09masterworker/app/Master.fs | 69 +++++ .../app/MessageHelpers.fs | 55 ++++ examples/example09masterworker/app/Program.fs | 146 ++------- examples/example09masterworker/app/Worker.fs | 64 ++++ examples/example09masterworker/app/app.fsproj | 6 + .../app/paket.references | 2 + examples/example09masterworker/build.sh | 5 + examples/example09masterworker/build_kube.sh | 10 + .../example09masterworker/docker-compose.yml | 23 ++ .../messages/Messages.g.cs | 287 +++++++++++------- .../messages/messages.proto | 14 +- 14 files changed, 462 insertions(+), 251 deletions(-) create mode 100644 examples/example09masterworker/app/Cluster.fs rename examples/{ => example09masterworker/app}/Dockerfile (53%) create mode 100644 examples/example09masterworker/app/HelloProtoActor.fs create mode 100644 examples/example09masterworker/app/Master.fs create mode 100644 examples/example09masterworker/app/MessageHelpers.fs create mode 100644 examples/example09masterworker/app/Worker.fs create mode 100644 examples/example09masterworker/app/paket.references create mode 100755 examples/example09masterworker/build.sh create mode 100755 examples/example09masterworker/build_kube.sh create mode 100644 examples/example09masterworker/docker-compose.yml diff --git a/examples/example09masterworker/app/Cluster.fs b/examples/example09masterworker/app/Cluster.fs new file mode 100644 index 0000000..b89654d --- /dev/null +++ b/examples/example09masterworker/app/Cluster.fs @@ -0,0 +1,12 @@ +module Cluster + +open Proto.Cluster +open Proto.Cluster.Consul +open System + +let startCluster nodeName = + let consulConfigure = Action(fun c -> c.Address <- Uri("http://consul:8500/")) + printfn "Connecting to cluster" + Cluster.Start("FiresideChatCluster", nodeName, 12000, ConsulProvider(ConsulProviderOptions(), consulConfigure)) + printfn "Connected to cluster" + diff --git a/examples/Dockerfile b/examples/example09masterworker/app/Dockerfile similarity index 53% rename from examples/Dockerfile rename to examples/example09masterworker/app/Dockerfile index a56c8c2..adb209d 100644 --- a/examples/Dockerfile +++ b/examples/example09masterworker/app/Dockerfile @@ -2,5 +2,6 @@ FROM gcr.io/google-appengine/aspnetcore:2.0 ADD ./bin/Release/netcoreapp2.0/publish/ /app WORKDIR /app - -ENTRYPOINT [ "dotnet", "Node1.dll" ] +EXPOSE 12000 +ENTRYPOINT [ "dotnet", "app.dll" ] +#CMD [ "dotnet", "app.dll", "m", "100" ] diff --git a/examples/example09masterworker/app/HelloProtoActor.fs b/examples/example09masterworker/app/HelloProtoActor.fs new file mode 100644 index 0000000..492e7ee --- /dev/null +++ b/examples/example09masterworker/app/HelloProtoActor.fs @@ -0,0 +1,15 @@ +module HelloProtoActor +open Proto.FSharp + +let hello() = + let pid1 = + Actor.create (printfn "Hello from actor1: %A") + |> Actor.initProps + |> Actor.spawn + let pid2 = + Actor.create (fun x -> printfn "Hello from actor2: %A" x; (sprintf "I was called with %A" x) >! pid1) + |> Actor.initProps + |> Actor.spawn + + [ 1 .. 100 ] + |> List.iter (tell pid2) diff --git a/examples/example09masterworker/app/Master.fs b/examples/example09masterworker/app/Master.fs new file mode 100644 index 0000000..bf4f5fb --- /dev/null +++ b/examples/example09masterworker/app/Master.fs @@ -0,0 +1,69 @@ +module Master +open MessageHelpers +open Proto.FSharp +open Proto.Remote + +module Sink = + type SinkState = { + ExpectedNumberOfResult: int option + NumberOfResult: int + TotalSum: float32 + } + + let sinkHandler message state = + let state' = + match message with + | SubmitExpectedResultCount m -> {state with ExpectedNumberOfResult = Some (m.Count)} + | SubmitResult m -> {state with TotalSum = state.TotalSum + m.Result; NumberOfResult = state.NumberOfResult + 1} + | _ -> state + match state'.ExpectedNumberOfResult with + | Some x when x = state'.NumberOfResult -> + printfn "Average is: %A" (state'.TotalSum / (float32)state'.NumberOfResult) + | None -> + printfn "Current sink state: %A" (state', message) + | Some _ -> () + state' + + let createSink() = + Actor.withState (mapMsg >> sinkHandler) {ExpectedNumberOfResult = None; NumberOfResult = 0; TotalSum = (float32)0.} + |> Actor.initProps + |> Actor.spawn + +type MasterState = { + RemainingWork: int list + SinkPid: Proto.PID option +} + +let setupSink expectedWorkLoad = + let sinkPid = Sink.createSink() + let expectedCountMsg = MessageHelpers.newSubmitExpectedResultCount expectedWorkLoad + expectedCountMsg >! sinkPid + sinkPid + +let handler (context: Proto.IContext) message state = + match message |> MessageHelpers.mapMsg with + | SystemMessage (SystemMessage.Started _) -> + printfn "Starting master" + let sinkPid = setupSink (state.RemainingWork.Length) + printfn "Sink started" + {state with SinkPid = Some sinkPid} + | RequestWork m -> + match state.RemainingWork with + | [] -> state + | x::rest -> + let sinkPid = state.SinkPid |> Option.get + let protoPid = m.Pid |> MessageHelpers.toProtoPid + newSubmitWork sinkPid x >! protoPid + {state with RemainingWork = rest} + +let createMasterProps numberOfWork = + Actor.withState2 handler {RemainingWork = [1 .. numberOfWork]; SinkPid = None} + |> Actor.initProps + +let createMaster = createMasterProps >> (Actor.spawn) + +let startMaster numberOfWork = + let props = createMasterProps numberOfWork + Remote.RegisterKnownKind("MasterKind", props) + createMaster numberOfWork |> ignore + Cluster.startCluster "master" diff --git a/examples/example09masterworker/app/MessageHelpers.fs b/examples/example09masterworker/app/MessageHelpers.fs new file mode 100644 index 0000000..c19fd48 --- /dev/null +++ b/examples/example09masterworker/app/MessageHelpers.fs @@ -0,0 +1,55 @@ +module MessageHelpers +open ProtoActorDemo.Messages +open Proto.FSharp + +let toMessagePid (pid: Proto.PID) = + let messagePid = PID() + messagePid.Address <- pid.Address + messagePid.Id <- pid.Id + messagePid + +let toProtoPid (pid: PID) = + Proto.PID(pid.Address, pid.Id) + +let newRequestWork (pid: Proto.PID) = + let requestWork = RequestWork() + requestWork.Pid <- toMessagePid pid + requestWork + +let newSubmitWork (pid: Proto.PID) data = + let submitWork = SubmitWork() + submitWork.Data <- data + submitWork.Pid <- toMessagePid pid + submitWork + +let newSubmitResult result = + let submitResult = SubmitResult() + submitResult.Result <- result + submitResult + +let newSubmitExpectedResultCount count = + let submitExpectedResultCount = new SubmitExpectedResultCount() + submitExpectedResultCount.Count <- count + submitExpectedResultCount + +type Message = + | RequestWork of RequestWork + | SubmitWork of SubmitWork + | SubmitExpectedResultCount of SubmitExpectedResultCount + | SubmitResult of SubmitResult + | SystemMessage of SystemMessage + | Unknown of obj + +let (|IsMessage|_|) (msg: obj) = + match msg with + | :? RequestWork as m -> Some(RequestWork m) + | :? SubmitWork as m -> Some(SubmitWork m) + | :? SubmitResult as m -> Some(SubmitResult m) + | :? SubmitExpectedResultCount as m -> Some(SubmitExpectedResultCount m) + | _ -> None + +let mapMsg (msg: obj) = + match msg with + | IsMessage m -> m + | IsSystemMessage m -> SystemMessage m + | _ -> Unknown msg diff --git a/examples/example09masterworker/app/Program.fs b/examples/example09masterworker/app/Program.fs index 8156a43..3ecc96e 100644 --- a/examples/example09masterworker/app/Program.fs +++ b/examples/example09masterworker/app/Program.fs @@ -2,143 +2,35 @@ open System open Proto.FSharp -open ProtoActorDemo.Messages open Proto.Mailbox open Proto.FSharp.Core +open Proto.Remote +open ProtoActorDemo.Messages -module MessageHelpers = - - let toMessagePid (pid: Proto.PID) = - let messagePid = PID() - messagePid.Address <- pid.Address - messagePid.Id <- pid.Id - messagePid - - let toProtoPid (pid: PID) = - Proto.PID(pid.Address, pid.Id) - - let newRequestWork (pid: Proto.PID) = - let requestWork = RequestWork() - requestWork.Pid <- toMessagePid pid - requestWork - - let newSubmitWorkRequest (pid: Proto.PID) (correlationId: Guid) data = - let submitWorkRequest = SubmitWorkRequest() - submitWorkRequest.CorrelationId <- correlationId.ToString() - submitWorkRequest.Data <- data - submitWorkRequest.Pid <- toMessagePid pid - submitWorkRequest - - let newFinishedWorkResponse (pid: Proto.PID) (correlationId: Guid) result = - let finishedWorkResponse = FinishedWorkResponse() - finishedWorkResponse.CorrelationId <- correlationId.ToString() - finishedWorkResponse.Result <- result - finishedWorkResponse.Pid <- toMessagePid pid - finishedWorkResponse - - type Messages = - | RequestWork of RequestWork - | SubmitWorkRequest of SubmitWorkRequest - | FinishedWorkResponse of FinishedWorkResponse - | SystemMessage of SystemMessage - | Unknown of obj - - let (|IsMessage|_|) (msg: obj) = - match msg with - | :? RequestWork as m -> Some(RequestWork m) - | :? SubmitWorkRequest as m -> Some(SubmitWorkRequest m) - | :? FinishedWorkResponse as m -> Some(FinishedWorkResponse m) - | _ -> None - - let mapMsg (msg: obj) = - match msg with - | IsMessage m -> m - | IsSystemMessage m -> SystemMessage m - | _ -> Unknown msg - -module Master = - open MessageHelpers - - type MasterState = { - JobQueue: int list - SubmittedJob: Map - FinishedJobs: Map - StartTime: DateTime - Sink: PID option - } - - let handler (context:Proto.IContext) message state = - let submitJob (pid: ProtoActorDemo.Messages.PID) state = - match state.JobQueue with - | x::rest -> - let correlationId = Guid.NewGuid() - let request = newSubmitWorkRequest (state.Sink |> Option.get |> toProtoPid) correlationId x - request >! (toProtoPid pid) - {state with JobQueue = rest; SubmittedJob = state.SubmittedJob |> Map.add correlationId x} - | [] -> state - - match mapMsg message with - | RequestWork m -> submitJob (m.Pid) state - | FinishedWorkResponse m -> - let key = Guid.Parse(m.CorrelationId) - if state.SubmittedJob |> Map.tryFind key |> Option.isSome - then - let state' = {state with FinishedJobs = state.FinishedJobs |> Map.add key m.Result} - if state'.FinishedJobs.Count = state'.SubmittedJob.Count && state.JobQueue |> List.isEmpty - then - let result = state'.FinishedJobs |> Map.toSeq |> Seq.sumBy snd - printfn "JobQueue finished with total result: %A" result - printfn "Duration %A" (DateTime.Now - state.StartTime) - state' - else - state' |> submitJob m.Pid - else state - | SystemMessage (SystemMessage.Started _) -> - printfn "YOLO" - let sinkProtoPid = Actor.create (printfn "Sink: %A") |> Actor.initProps |> Actor.spawn - let sinkPid = PID() - printfn "YOLO 2" - sinkPid.Address <- sinkProtoPid.Address - sinkPid.Id <- sinkProtoPid.Id - {state with Sink = Some sinkPid } - | _ -> state - - - let createMaster workLoad = - printfn "Creating master" - Actor.withState2 handler ({JobQueue = (List.init workLoad id); SubmittedJob = Map.empty; FinishedJobs = Map.empty; StartTime = DateTime.Now; Sink = None}) - |> Actor.initProps - |> Actor.spawn +let testLocal() = + let masterPid = Master.createMaster 100 -module Worker = - open MessageHelpers - let createWorker (masterPid: Proto.PID) workerId = - let handler (context: Proto.IContext) message = - match mapMsg message with - | SubmitWorkRequest m -> - printfn "%d Got some work: %A" workerId m.Data - System.Threading.Thread.Sleep 1000 - let result = (m.Data |> float |> Math.Sqrt |> int) - newFinishedWorkResponse context.Self (Guid.Parse(m.CorrelationId)) result - >! (toProtoPid m.Pid) - newRequestWork (context.Self) >! masterPid - | SystemMessage (SystemMessage.Started _) -> - printfn "Worker started: %A" workerId - newRequestWork (context.Self) >! masterPid - | _ -> printfn "Should never happen" + Worker.createWorkerMonitor 5 (Worker.requestWork masterPid) |> ignore - Actor.create2 handler - |> Actor.initProps - |> Actor.spawn -open MessageHelpers [] let main argv = + printfn "Registrating protos" + Serialization.RegisterFileDescriptor(ProtoActorDemo.Messages.MessagesReflection.Descriptor) + + let argList = argv |> Array.toList + + match argList with + | [] | "hello"::_ -> HelloProtoActor.hello() + | "local"::_ | "l" :: _ -> testLocal() + | "m"::count::_ -> Master.startMaster (Int32.Parse count) |> ignore + | "w"::count::_ -> Worker.startWorker (Int32.Parse count) |> ignore + | _ -> printfn "Unkown argument: %A" argList - let masterPid = Master.createMaster 20 - let workerPids = [1 .. 10] |> List.map (fun i -> Worker.createWorker masterPid i) + // let masterPid = Master.createMaster 30 + // let workerPids = [1 .. 10] |> List.map (fun i -> Worker.createWorker masterPid i) - printfn "Hello World from F#!" + // printfn "Hello World from F#!" System.Threading.Thread.Sleep Int32.MaxValue 0 // return an integer exit code diff --git a/examples/example09masterworker/app/Worker.fs b/examples/example09masterworker/app/Worker.fs new file mode 100644 index 0000000..9184074 --- /dev/null +++ b/examples/example09masterworker/app/Worker.fs @@ -0,0 +1,64 @@ +module Worker +open MessageHelpers +open Proto.FSharp +open Proto +open System +open Proto.Mailbox +open ProtoActorDemo.Messages + +// module Master = +// let sinkPid = Sink.createSink() + +let handler workerId requestWork (context: Proto.IContext) message = + match message |> MessageHelpers.mapMsg with + | SubmitWork m -> + printfn "%d Got some work: %A" workerId m + System.Threading.Thread.Sleep 1000 + let result = m.Data |> float |> Math.Sqrt |> float32 + let resultDto = MessageHelpers.newSubmitResult result + resultDto >! (m.Pid |> MessageHelpers.toProtoPid) + requestWork (context.Self) + | SystemMessage (SystemMessage.Started _) -> + requestWork (context.Self) + +let createWorkerKind requestWork workerId = + Actor.create2 (handler workerId requestWork) |> Actor.initProps + + +let createWorkerMonitor workerCount requestWork = + let handler (context: Proto.IContext) message = + match message |> MessageHelpers.mapMsg with + | SystemMessage (SystemMessage.Started _) -> + [ 1 .. workerCount ] |> List.iter (fun i -> i |> createWorkerKind requestWork |> context.Spawn |> ignore) + | _ -> () + + Actor.create2 handler + |> Actor.initProps + |> Actor.Spawn + +let requestWork masterPid workerPid = + MessageHelpers.newRequestWork workerPid >! masterPid + +let getMasterPid() = + let rec getPid() = + printfn "Trying to get pid again" + let (pid, sc) = Proto.Cluster.Cluster.GetAsync("FiresideChatCluster", "MasterKind").Result.ToTuple() + if sc <> Proto.Remote.ResponseStatusCode.OK then + printfn "Failed to get pid: %A" (pid, sc) + System.Threading.Thread.Sleep(4000) + getPid() + else pid + getPid() + +let startWorker workerCount = + let hostName = + match Environment.GetEnvironmentVariable("CUMPUTERNAME"), Environment.GetEnvironmentVariable("HOSTNAME") with + | null, h -> h + | h, _ -> h + printfn "Hostname: %A" hostName + + for entry in Environment.GetEnvironmentVariables() |> Seq.cast do printfn "%A: %A" entry.Key entry.Value + + Cluster.startCluster (hostName) // + (System.Guid.NewGuid().ToString())) + let masterPid = getMasterPid() + createWorkerMonitor workerCount (requestWork masterPid) diff --git a/examples/example09masterworker/app/app.fsproj b/examples/example09masterworker/app/app.fsproj index 9aa683f..f455e78 100644 --- a/examples/example09masterworker/app/app.fsproj +++ b/examples/example09masterworker/app/app.fsproj @@ -12,6 +12,12 @@ + + + + + + \ No newline at end of file diff --git a/examples/example09masterworker/app/paket.references b/examples/example09masterworker/app/paket.references new file mode 100644 index 0000000..4a10aca --- /dev/null +++ b/examples/example09masterworker/app/paket.references @@ -0,0 +1,2 @@ +Proto.Cluster.Consul +FSharp.Core \ No newline at end of file diff --git a/examples/example09masterworker/build.sh b/examples/example09masterworker/build.sh new file mode 100755 index 0000000..a528df1 --- /dev/null +++ b/examples/example09masterworker/build.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +dotnet publish -c Release app +#dotnet publish -c Release Node2 +docker-compose up --build --scale worker=3 diff --git a/examples/example09masterworker/build_kube.sh b/examples/example09masterworker/build_kube.sh new file mode 100755 index 0000000..83638d6 --- /dev/null +++ b/examples/example09masterworker/build_kube.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +dotnet publish -c Release app +#dotnet publish -c Release Node2 + +docker build -t gcr.io/${PROJECT_ID}/fireproto:v1 app/. +##docker build -t gcr.io/${PROJECT_ID}/protodemonode2:v1 Node2/. + +#gcloud docker -- push gcr.io/${PROJECT_ID}/f:v1 +gcloud docker -- push gcr.io/${PROJECT_ID}/fireproto:v1 \ No newline at end of file diff --git a/examples/example09masterworker/docker-compose.yml b/examples/example09masterworker/docker-compose.yml new file mode 100644 index 0000000..9328956 --- /dev/null +++ b/examples/example09masterworker/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3' +services: + master: + build: ./app + container_name: "master" + hostname: "master" + depends_on: + - consul + command: "m 200" + worker: + build: ./app + depends_on: + - master + command: "w 10" + consul: + image: "consul:latest" + container_name: "consul" + hostname: "consul" + ports: + - "8400:8400" + - "8500:8500" + - "8600:53" + command: "agent -server -bootstrap -ui -disable-host-node-id -client 0.0.0.0" \ No newline at end of file diff --git a/examples/example09masterworker/messages/Messages.g.cs b/examples/example09masterworker/messages/Messages.g.cs index 021d729..af1fc6b 100644 --- a/examples/example09masterworker/messages/Messages.g.cs +++ b/examples/example09masterworker/messages/Messages.g.cs @@ -24,19 +24,19 @@ static MessagesReflection() { string.Concat( "Cg5tZXNzYWdlcy5wcm90bxIIbWVzc2FnZXMiIgoDUElEEg8KB0FkZHJlc3MY", "ASABKAkSCgoCSWQYAiABKAkiKQoLUmVxdWVzdFdvcmsSGgoDcGlkGAEgASgL", - "Mg0ubWVzc2FnZXMuUElEIlUKEVN1Ym1pdFdvcmtSZXF1ZXN0EhoKA3BpZBgB", - "IAEoCzINLm1lc3NhZ2VzLlBJRBIWCg5jb3JyZWxhdGlvbl9pZBgCIAEoCRIM", - "CgRkYXRhGAMgASgFIloKFEZpbmlzaGVkV29ya1Jlc3BvbnNlEhoKA3BpZBgB", - "IAEoCzINLm1lc3NhZ2VzLlBJRBIWCg5jb3JyZWxhdGlvbl9pZBgCIAEoCRIO", - "CgZyZXN1bHQYAyABKAVCGqoCF1Byb3RvQWN0b3JEZW1vLk1lc3NhZ2VzYgZw", - "cm90bzM=")); + "Mg0ubWVzc2FnZXMuUElEIjYKClN1Ym1pdFdvcmsSGgoDcGlkGAEgASgLMg0u", + "bWVzc2FnZXMuUElEEgwKBGRhdGEYAyABKAUiKgoZU3VibWl0RXhwZWN0ZWRS", + "ZXN1bHRDb3VudBINCgVjb3VudBgBIAEoBSIsCgxTdWJtaXRSZXN1bHQSDAoE", + "ZGF0YRgDIAEoBRIOCgZyZXN1bHQYBCABKAJCGqoCF1Byb3RvQWN0b3JEZW1v", + "Lk1lc3NhZ2VzYgZwcm90bzM=")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.PID), global::ProtoActorDemo.Messages.PID.Parser, new[]{ "Address", "Id" }, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.RequestWork), global::ProtoActorDemo.Messages.RequestWork.Parser, new[]{ "Pid" }, null, null, null), - new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitWorkRequest), global::ProtoActorDemo.Messages.SubmitWorkRequest.Parser, new[]{ "Pid", "CorrelationId", "Data" }, null, null, null), - new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.FinishedWorkResponse), global::ProtoActorDemo.Messages.FinishedWorkResponse.Parser, new[]{ "Pid", "CorrelationId", "Result" }, null, null, null) + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitWork), global::ProtoActorDemo.Messages.SubmitWork.Parser, new[]{ "Pid", "Data" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitExpectedResultCount), global::ProtoActorDemo.Messages.SubmitExpectedResultCount.Parser, new[]{ "Count" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ProtoActorDemo.Messages.SubmitResult), global::ProtoActorDemo.Messages.SubmitResult.Parser, new[]{ "Data", "Result" }, null, null, null) })); } #endregion @@ -311,10 +311,10 @@ public void MergeFrom(pb::CodedInputStream input) { } - public sealed partial class SubmitWorkRequest : pb::IMessage { - private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitWorkRequest()); + public sealed partial class SubmitWork : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitWork()); [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public static pb::MessageParser Parser { get { return _parser; } } + public static pb::MessageParser Parser { get { return _parser; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { @@ -327,22 +327,21 @@ public sealed partial class SubmitWorkRequest : pb::IMessage } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public SubmitWorkRequest() { + public SubmitWork() { OnConstruction(); } partial void OnConstruction(); [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public SubmitWorkRequest(SubmitWorkRequest other) : this() { + public SubmitWork(SubmitWork other) : this() { Pid = other.pid_ != null ? other.Pid.Clone() : null; - correlationId_ = other.correlationId_; data_ = other.data_; } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public SubmitWorkRequest Clone() { - return new SubmitWorkRequest(this); + public SubmitWork Clone() { + return new SubmitWork(this); } /// Field number for the "pid" field. @@ -356,17 +355,6 @@ public SubmitWorkRequest Clone() { } } - /// Field number for the "correlation_id" field. - public const int CorrelationIdFieldNumber = 2; - private string correlationId_ = ""; - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public string CorrelationId { - get { return correlationId_; } - set { - correlationId_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); - } - } - /// Field number for the "data" field. public const int DataFieldNumber = 3; private int data_; @@ -380,11 +368,11 @@ public int Data { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public override bool Equals(object other) { - return Equals(other as SubmitWorkRequest); + return Equals(other as SubmitWork); } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public bool Equals(SubmitWorkRequest other) { + public bool Equals(SubmitWork other) { if (ReferenceEquals(other, null)) { return false; } @@ -392,7 +380,6 @@ public bool Equals(SubmitWorkRequest other) { return true; } if (!object.Equals(Pid, other.Pid)) return false; - if (CorrelationId != other.CorrelationId) return false; if (Data != other.Data) return false; return true; } @@ -401,7 +388,6 @@ public bool Equals(SubmitWorkRequest other) { public override int GetHashCode() { int hash = 1; if (pid_ != null) hash ^= Pid.GetHashCode(); - if (CorrelationId.Length != 0) hash ^= CorrelationId.GetHashCode(); if (Data != 0) hash ^= Data.GetHashCode(); return hash; } @@ -417,10 +403,6 @@ public void WriteTo(pb::CodedOutputStream output) { output.WriteRawTag(10); output.WriteMessage(Pid); } - if (CorrelationId.Length != 0) { - output.WriteRawTag(18); - output.WriteString(CorrelationId); - } if (Data != 0) { output.WriteRawTag(24); output.WriteInt32(Data); @@ -433,9 +415,6 @@ public int CalculateSize() { if (pid_ != null) { size += 1 + pb::CodedOutputStream.ComputeMessageSize(Pid); } - if (CorrelationId.Length != 0) { - size += 1 + pb::CodedOutputStream.ComputeStringSize(CorrelationId); - } if (Data != 0) { size += 1 + pb::CodedOutputStream.ComputeInt32Size(Data); } @@ -443,7 +422,7 @@ public int CalculateSize() { } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void MergeFrom(SubmitWorkRequest other) { + public void MergeFrom(SubmitWork other) { if (other == null) { return; } @@ -453,9 +432,6 @@ public void MergeFrom(SubmitWorkRequest other) { } Pid.MergeFrom(other.Pid); } - if (other.CorrelationId.Length != 0) { - CorrelationId = other.CorrelationId; - } if (other.Data != 0) { Data = other.Data; } @@ -476,10 +452,6 @@ public void MergeFrom(pb::CodedInputStream input) { input.ReadMessage(pid_); break; } - case 18: { - CorrelationId = input.ReadString(); - break; - } case 24: { Data = input.ReadInt32(); break; @@ -490,10 +462,10 @@ public void MergeFrom(pb::CodedInputStream input) { } - public sealed partial class FinishedWorkResponse : pb::IMessage { - private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new FinishedWorkResponse()); + public sealed partial class SubmitExpectedResultCount : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitExpectedResultCount()); [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public static pb::MessageParser Parser { get { return _parser; } } + public static pb::MessageParser Parser { get { return _parser; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { @@ -506,51 +478,156 @@ public sealed partial class FinishedWorkResponse : pb::IMessageField number for the "pid" field. - public const int PidFieldNumber = 1; - private global::ProtoActorDemo.Messages.PID pid_; + /// Field number for the "count" field. + public const int CountFieldNumber = 1; + private int count_; [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public global::ProtoActorDemo.Messages.PID Pid { - get { return pid_; } + public int Count { + get { return count_; } set { - pid_ = value; + count_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as SubmitExpectedResultCount); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(SubmitExpectedResultCount other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Count != other.Count) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Count != 0) hash ^= Count.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Count != 0) { + output.WriteRawTag(8); + output.WriteInt32(Count); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Count != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Count); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(SubmitExpectedResultCount other) { + if (other == null) { + return; + } + if (other.Count != 0) { + Count = other.Count; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 8: { + Count = input.ReadInt32(); + break; + } + } } } - /// Field number for the "correlation_id" field. - public const int CorrelationIdFieldNumber = 2; - private string correlationId_ = ""; + } + + public sealed partial class SubmitResult : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new SubmitResult()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public string CorrelationId { - get { return correlationId_; } + public static pbr::MessageDescriptor Descriptor { + get { return global::ProtoActorDemo.Messages.MessagesReflection.Descriptor.MessageTypes[4]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitResult() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitResult(SubmitResult other) : this() { + data_ = other.data_; + result_ = other.result_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public SubmitResult Clone() { + return new SubmitResult(this); + } + + /// Field number for the "data" field. + public const int DataFieldNumber = 3; + private int data_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Data { + get { return data_; } set { - correlationId_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + data_ = value; } } /// Field number for the "result" field. - public const int ResultFieldNumber = 3; - private int result_; + public const int ResultFieldNumber = 4; + private float result_; [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public int Result { + public float Result { get { return result_; } set { result_ = value; @@ -559,19 +636,18 @@ public int Result { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public override bool Equals(object other) { - return Equals(other as FinishedWorkResponse); + return Equals(other as SubmitResult); } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public bool Equals(FinishedWorkResponse other) { + public bool Equals(SubmitResult other) { if (ReferenceEquals(other, null)) { return false; } if (ReferenceEquals(other, this)) { return true; } - if (!object.Equals(Pid, other.Pid)) return false; - if (CorrelationId != other.CorrelationId) return false; + if (Data != other.Data) return false; if (Result != other.Result) return false; return true; } @@ -579,9 +655,8 @@ public bool Equals(FinishedWorkResponse other) { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public override int GetHashCode() { int hash = 1; - if (pid_ != null) hash ^= Pid.GetHashCode(); - if (CorrelationId.Length != 0) hash ^= CorrelationId.GetHashCode(); - if (Result != 0) hash ^= Result.GetHashCode(); + if (Data != 0) hash ^= Data.GetHashCode(); + if (Result != 0F) hash ^= Result.GetHashCode(); return hash; } @@ -592,50 +667,37 @@ public override string ToString() { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public void WriteTo(pb::CodedOutputStream output) { - if (pid_ != null) { - output.WriteRawTag(10); - output.WriteMessage(Pid); - } - if (CorrelationId.Length != 0) { - output.WriteRawTag(18); - output.WriteString(CorrelationId); - } - if (Result != 0) { + if (Data != 0) { output.WriteRawTag(24); - output.WriteInt32(Result); + output.WriteInt32(Data); + } + if (Result != 0F) { + output.WriteRawTag(37); + output.WriteFloat(Result); } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public int CalculateSize() { int size = 0; - if (pid_ != null) { - size += 1 + pb::CodedOutputStream.ComputeMessageSize(Pid); - } - if (CorrelationId.Length != 0) { - size += 1 + pb::CodedOutputStream.ComputeStringSize(CorrelationId); + if (Data != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Data); } - if (Result != 0) { - size += 1 + pb::CodedOutputStream.ComputeInt32Size(Result); + if (Result != 0F) { + size += 1 + 4; } return size; } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - public void MergeFrom(FinishedWorkResponse other) { + public void MergeFrom(SubmitResult other) { if (other == null) { return; } - if (other.pid_ != null) { - if (pid_ == null) { - pid_ = new global::ProtoActorDemo.Messages.PID(); - } - Pid.MergeFrom(other.Pid); - } - if (other.CorrelationId.Length != 0) { - CorrelationId = other.CorrelationId; + if (other.Data != 0) { + Data = other.Data; } - if (other.Result != 0) { + if (other.Result != 0F) { Result = other.Result; } } @@ -648,19 +710,12 @@ public void MergeFrom(pb::CodedInputStream input) { default: input.SkipLastField(); break; - case 10: { - if (pid_ == null) { - pid_ = new global::ProtoActorDemo.Messages.PID(); - } - input.ReadMessage(pid_); - break; - } - case 18: { - CorrelationId = input.ReadString(); + case 24: { + Data = input.ReadInt32(); break; } - case 24: { - Result = input.ReadInt32(); + case 37: { + Result = input.ReadFloat(); break; } } diff --git a/examples/example09masterworker/messages/messages.proto b/examples/example09masterworker/messages/messages.proto index ff1e9f3..7c8053d 100644 --- a/examples/example09masterworker/messages/messages.proto +++ b/examples/example09masterworker/messages/messages.proto @@ -12,14 +12,16 @@ message RequestWork { PID pid = 1; } -message SubmitWorkRequest { +message SubmitWork { PID pid = 1; - string correlation_id = 2; int32 data = 3; } -message FinishedWorkResponse { - PID pid = 1; - string correlation_id = 2; - int32 result = 3; +message SubmitExpectedResultCount { + int32 count = 1; +} + +message SubmitResult { + int32 data = 3; + float result = 4; } \ No newline at end of file From 01b4df3b686e151fe3c1d6cef100b0bb572c044a Mon Sep 17 00:00:00 2001 From: Tomas Jansson Date: Tue, 23 Jan 2018 20:55:01 +0100 Subject: [PATCH 3/3] added supervision to the sample app --- examples/example09masterworker/app/Program.fs | 1 + .../example09masterworker/app/Supervision.fs | 30 +++++++++++++++++++ examples/example09masterworker/app/app.fsproj | 1 + .../buildAndRunDocker.sh | 4 +++ .../example09masterworker/docker-compose.yml | 2 +- examples/example09masterworker/run_docker.sh | 3 ++ 6 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 examples/example09masterworker/app/Supervision.fs create mode 100755 examples/example09masterworker/buildAndRunDocker.sh create mode 100755 examples/example09masterworker/run_docker.sh diff --git a/examples/example09masterworker/app/Program.fs b/examples/example09masterworker/app/Program.fs index 3ecc96e..e02997d 100644 --- a/examples/example09masterworker/app/Program.fs +++ b/examples/example09masterworker/app/Program.fs @@ -22,6 +22,7 @@ let main argv = match argList with | [] | "hello"::_ -> HelloProtoActor.hello() + | "supervision"::_ | "s"::_ -> Supervision.run() | "local"::_ | "l" :: _ -> testLocal() | "m"::count::_ -> Master.startMaster (Int32.Parse count) |> ignore | "w"::count::_ -> Worker.startWorker (Int32.Parse count) |> ignore diff --git a/examples/example09masterworker/app/Supervision.fs b/examples/example09masterworker/app/Supervision.fs new file mode 100644 index 0000000..992e93c --- /dev/null +++ b/examples/example09masterworker/app/Supervision.fs @@ -0,0 +1,30 @@ +module Supervision +open Proto.FSharp +open Proto + +let run() = + let rand = new System.Random() + + let createChild childId = + let handler _ = + let rec inner cnt = + if cnt = 5 then + childId + |> sprintf "Child %d: I don't want to play anymore" + |> exn + |> raise + else + printfn "Child %d: Playing with you %d" childId cnt + System.Threading.Thread.Sleep(rand.Next(2000)) + inner (cnt+1) + inner 0 + + Actor.create handler |> Actor.initProps + + let masterHandler (context:IContext) (msg:obj) = + match msg with + | :? Started -> + [1 .. 2] |> List.iter (createChild >> context.Spawn >> ignore) + | x -> printfn "Master: %A" x + + Actor.create2 masterHandler |> Actor.initProps |> Actor.spawn |> ignore \ No newline at end of file diff --git a/examples/example09masterworker/app/app.fsproj b/examples/example09masterworker/app/app.fsproj index f455e78..7aa6619 100644 --- a/examples/example09masterworker/app/app.fsproj +++ b/examples/example09masterworker/app/app.fsproj @@ -13,6 +13,7 @@ + diff --git a/examples/example09masterworker/buildAndRunDocker.sh b/examples/example09masterworker/buildAndRunDocker.sh new file mode 100755 index 0000000..1b2dfab --- /dev/null +++ b/examples/example09masterworker/buildAndRunDocker.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +./build.sh +./run_docker.sh \ No newline at end of file diff --git a/examples/example09masterworker/docker-compose.yml b/examples/example09masterworker/docker-compose.yml index 9328956..7abd398 100644 --- a/examples/example09masterworker/docker-compose.yml +++ b/examples/example09masterworker/docker-compose.yml @@ -11,7 +11,7 @@ services: build: ./app depends_on: - master - command: "w 10" + command: "w 1" consul: image: "consul:latest" container_name: "consul" diff --git a/examples/example09masterworker/run_docker.sh b/examples/example09masterworker/run_docker.sh new file mode 100755 index 0000000..fc521fd --- /dev/null +++ b/examples/example09masterworker/run_docker.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker-compose up --build --scale worker=1