Skip to content

Commit

Permalink
address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Stan Iliev committed Feb 23, 2016
1 parent 20814b2 commit ad8d1e7
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 99 deletions.
10 changes: 5 additions & 5 deletions src/Agent.Listener/MessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public async Task ListenAsync()
}
Debug.Assert(_settings != null, "settings should not be null");
var taskServer = HostContext.GetService<ITaskServer>();
//TODO: Interaction with the WorkerManager is the responsibility of the caller. Listener just returns the message.
using (var workerManager = HostContext.GetService<IWorkerManager>())
{

Expand Down Expand Up @@ -120,12 +121,12 @@ public async Task ListenAsync()
}
catch (Exception ex)
{
Trace.Verbose("MessageListener.Listen - Exception received.");
Trace.Warning("MessageListener.Listen - Exception received.");
Trace.Error(ex);
// TODO: Throw a specific exception so the caller can control the flow appropriately.
return;
}

if (message == null)
{
Trace.Verbose("MessageListener.Listen - No message retrieved from session '{0}'.", this.Session.SessionId);
Expand All @@ -137,9 +138,8 @@ public async Task ListenAsync()
{
// Check if refresh is required.
if (String.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
// Throw a specific exception so the caller can control the flow appropriately.
return;
{
Trace.Warning("Referesh message received, but not yet handled by agent implementation.");
}
else if (String.Equals(message.MessageType, JobRequestMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
Expand Down
3 changes: 2 additions & 1 deletion src/Agent.Listener/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public interface IWorker : IDisposable, IAgentService
event EventHandler StateChanged;
Guid JobId { get; set; }
IProcessChannel ProcessChannel { get; set; }
//TODO: instead of LaunchProcess, do something like Task RunAsync(...) and make sure you take a cancellation token. The way the IWorkerManager can handle cancelling the worker is to simply signal the cancellation token that it handed to the IWorker.RunAsync method.
void LaunchProcess(String pipeHandleOut, String pipeHandleIn, string workingFolder);
}

Expand Down Expand Up @@ -56,7 +57,7 @@ public Worker()
{
State = WorkerState.New;
}

public void LaunchProcess(String pipeHandleOut, String pipeHandleIn, string workingFolder)
{
string workerFileName = Path.Combine(AssemblyUtil.AssemblyDirectory, WorkerProcessName);
Expand Down
8 changes: 4 additions & 4 deletions src/Agent.Listener/WorkerManager.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Microsoft.VisualStudio.Services.Agent.Util;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;

namespace Microsoft.VisualStudio.Services.Agent.Listener
Expand All @@ -22,13 +22,13 @@ public async Task Run(JobRequestMessage jobRequestMessage)
Trace.Info("Job request {0} received.", jobRequestMessage.JobId);
var worker = HostContext.GetService<IWorker>();
worker.JobId = jobRequestMessage.JobId;
//we should always create a IProcessChannel, and not use a singleton
worker.ProcessChannel = HostContext.GetService<IProcessChannel>();
worker.StateChanged += Worker_StateChanged;
_jobsInProgress[jobRequestMessage.JobId] = worker;
worker.ProcessChannel.StartServer( (p1, p2) =>
worker.ProcessChannel.StartServer( (pipeHandleOut, pipeHandleIn) =>
{
string workingFolder = ""; //TODO: pass working folder from the config to the worker process
worker.LaunchProcess(p1, p2, workingFolder);
worker.LaunchProcess(pipeHandleOut, pipeHandleIn, AssemblyUtil.AssemblyDirectory);
}
);
await worker.ProcessChannel.SendAsync(jobRequestMessage, HostContext.CancellationToken);
Expand Down
4 changes: 2 additions & 2 deletions src/Agent.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public async Task Run(JobRequestMessage message)
m_finishedSignal.Release();
}

public async Task WaitToFinish(IHostContext context)
public Task WaitToFinish(IHostContext context)
{
await m_finishedSignal.WaitAsync(context.CancellationToken);
return m_finishedSignal.WaitAsync(context.CancellationToken);
}

private IHostContext m_hostContext;
Expand Down
5 changes: 3 additions & 2 deletions src/Agent.Worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ public static async Task<Int32> RunAsync(string[] args)
m_trace.Error("Error Hello Worker!");
m_trace.Verbose("Verbos Hello Worker!");

//TODO: Consider removing events and use receive methods in ProcessChannel and StreamTransport
JobRunner jobRunner = null;
Func<CancellationToken, JobCancelMessage, Task> cancelHandler = (token, message) =>
Func<JobCancelMessage, CancellationToken, Task> cancelHandler = (message, token) =>
{
hc.CancellationTokenSource.Cancel();
return Task.CompletedTask;
};

Func<CancellationToken, JobRequestMessage, Task> newRequestHandler = async (token, message) =>
Func<JobRequestMessage, CancellationToken, Task> newRequestHandler = async (message, token) =>
{
await jobRunner.Run(message);
};
Expand Down
1 change: 1 addition & 0 deletions src/Microsoft.VisualStudio.Services.Agent/HostContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public CancellationToken CancellationToken
}
}

//TODO: hide somehow this variable
public CancellationTokenSource CancellationTokenSource
{
get
Expand Down
18 changes: 9 additions & 9 deletions src/Microsoft.VisualStudio.Services.Agent/ProcessChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Microsoft.VisualStudio.Services.Agent
[ServiceLocator(Default = typeof(ProcessChannel))]
public interface IProcessChannel : IDisposable, IAgentService
{
event Func<CancellationToken, JobRequestMessage, Task> JobRequestMessageReceived;
event Func<CancellationToken, JobCancelMessage, Task> JobCancelMessageReceived;
event Func<JobRequestMessage, CancellationToken, Task> JobRequestMessageReceived;
event Func<JobCancelMessage, CancellationToken, Task> JobCancelMessageReceived;

Task SendAsync(JobRequestMessage jobRequest, CancellationToken cancellationToken);
Task SendAsync(JobCancelMessage jobCancel, CancellationToken cancellationToken);
Expand Down Expand Up @@ -94,15 +94,15 @@ public async Task Stop()
}
}

private async Task Transport_PacketReceived(CancellationToken token, IPCPacket packet)
private async Task Transport_PacketReceived(IPCPacket packet, CancellationToken token)
{
token.ThrowIfCancellationRequested();
switch (packet.MessageType)
{
case 1:
{
var message = JsonUtility.FromString<JobRequestMessage>(packet.Body);
Func<CancellationToken, JobRequestMessage, Task> jobRequestMessageReceived = JobRequestMessageReceived;
Func<JobRequestMessage, CancellationToken, Task> jobRequestMessageReceived = JobRequestMessageReceived;
if (null == jobRequestMessageReceived)
{
return;
Expand All @@ -111,15 +111,15 @@ private async Task Transport_PacketReceived(CancellationToken token, IPCPacket p
Task[] handlerTasks = new Task[invocationList.Length];
for (int i = 0; i < invocationList.Length; i++)
{
handlerTasks[i] = ((Func<CancellationToken, JobRequestMessage, Task>)invocationList[i])(token, message);
handlerTasks[i] = ((Func<JobRequestMessage, CancellationToken, Task>)invocationList[i])(message, token);
}
await Task.WhenAll(handlerTasks);
}
break;
case 2:
{
var message = JsonUtility.FromString<JobCancelMessage>(packet.Body);
Func<CancellationToken, JobCancelMessage, Task> jobCancelMessageReceived = JobCancelMessageReceived;
Func<JobCancelMessage, CancellationToken, Task> jobCancelMessageReceived = JobCancelMessageReceived;
if (null == jobCancelMessageReceived)
{
return;
Expand All @@ -128,16 +128,16 @@ private async Task Transport_PacketReceived(CancellationToken token, IPCPacket p
Task[] handlerTasks = new Task[invocationList.Length];
for (int i = 0; i < invocationList.Length; i++)
{
handlerTasks[i] = ((Func<CancellationToken, JobCancelMessage, Task>)invocationList[i])(token, message);
handlerTasks[i] = ((Func<JobCancelMessage, CancellationToken, Task>)invocationList[i])(message, token);
}
await Task.WhenAll(handlerTasks);
}
break;
}
}

public event Func<CancellationToken, JobRequestMessage, Task> JobRequestMessageReceived;
public event Func<CancellationToken, JobCancelMessage, Task> JobCancelMessageReceived;
public event Func<JobRequestMessage, CancellationToken, Task> JobRequestMessageReceived;
public event Func<JobCancelMessage, CancellationToken, Task> JobCancelMessageReceived;

public async Task SendAsync(JobRequestMessage jobRequest, CancellationToken cancellationToken)
{
Expand Down
71 changes: 13 additions & 58 deletions src/Microsoft.VisualStudio.Services.Agent/StreamString.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,28 @@ public StreamString(Stream ioStream)

public async Task<Int32> ReadInt32Async(CancellationToken cancellationToken)
{
byte[] readBytes = new byte[4];
byte[] readBytes = new byte[sizeof(Int32)];
int dataread = 0;
while (4 - dataread > 0 && (!cancellationToken.IsCancellationRequested))
while (sizeof(Int32) - dataread > 0 && (!cancellationToken.IsCancellationRequested))
{
Task<int> op = ioStream.ReadAsync(readBytes, dataread, 4 - dataread, cancellationToken);
Task<int> op = ioStream.ReadAsync(readBytes, dataread, sizeof(Int32) - dataread, cancellationToken);
int newData = 0;
try
{
newData = await op.WithCancellation(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
newData = await op.WithCancellation(cancellationToken);
dataread += newData;
if (0 == newData)
{
await Task.Delay(100, cancellationToken);
}
}
cancellationToken.ThrowIfCancellationRequested();
if (BitConverter.IsLittleEndian)
{
Array.Reverse(readBytes);
}
cancellationToken.ThrowIfCancellationRequested();
return BitConverter.ToInt32(readBytes, 0);
}

public async Task WriteInt32Async(Int32 value, CancellationToken cancellationToken)
{
byte[] int32Bytes = BitConverter.GetBytes(value);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(int32Bytes);
}
Task op = ioStream.WriteAsync(int32Bytes, 0, 4, cancellationToken);
try
{
await op.WithCancellation(cancellationToken);
}
catch (OperationCanceledException)
{
throw new TaskCanceledException();
}
Task op = ioStream.WriteAsync(int32Bytes, 0, sizeof(Int32), cancellationToken);
await op.WithCancellation(cancellationToken);
}

const Int32 MAX_STRING_SIZE = 50*1000000;
Expand All @@ -83,23 +61,14 @@ public async Task<string> ReadStringAsync(CancellationToken cancellationToken)
{
Task<int> op = ioStream.ReadAsync(inBuffer, dataread, len - dataread, cancellationToken);
int newData = 0;
try
{
newData = await op.WithCancellation(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
newData = await op.WithCancellation(cancellationToken);
dataread += newData;
if (0 == newData)
{
await Task.Delay(100, cancellationToken);
}
}

cancellationToken.ThrowIfCancellationRequested();

cancellationToken.ThrowIfCancellationRequested();
return streamEncoding.GetString(inBuffer);
}

Expand All @@ -114,25 +83,11 @@ public async Task<int> WriteStringAsync(string outString, CancellationToken canc
await WriteInt32Async(len, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
Task op = ioStream.WriteAsync(outBuffer, 0, len, cancellationToken);
try
{
await op.WithCancellation(cancellationToken);
}
catch (OperationCanceledException)
{
}
cancellationToken.ThrowIfCancellationRequested();
await op.WithCancellation(cancellationToken);
op = ioStream.FlushAsync(cancellationToken);
try
{
await op.WithCancellation(cancellationToken);
}
catch (OperationCanceledException)
{
throw new TaskCanceledException();
}
return outBuffer.Length + 4;
}
await op.WithCancellation(cancellationToken);
return outBuffer.Length + sizeof(Int32);
}
}

}
10 changes: 3 additions & 7 deletions src/Microsoft.VisualStudio.Services.Agent/StreamTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public IPCPacket(Int32 p1, string p2)

public class StreamTransport
{
public event Func<CancellationToken, IPCPacket, Task> PacketReceived;
public event Func<IPCPacket, CancellationToken, Task> PacketReceived;

public Stream ReadPipe
{
Expand All @@ -42,18 +42,14 @@ public Stream WritePipe
public async Task SendAsync(Int32 MessageType, string Body, CancellationToken cancellationToken)
{
await WriteStream.WriteInt32Async(MessageType, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
await WriteStream.WriteStringAsync(Body, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
}

public async Task<IPCPacket> ReceiveAsync(CancellationToken cancellationToken)
{
IPCPacket result = new IPCPacket(-1, "");
result.MessageType = await ReadStream.ReadInt32Async(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
result.Body = await ReadStream.ReadStringAsync(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
return result;
}

Expand All @@ -62,7 +58,7 @@ public async Task Run(CancellationToken token)
while (!token.IsCancellationRequested)
{
var packet = await ReceiveAsync(token);
Func<CancellationToken, IPCPacket, Task> packetReceived = PacketReceived;
Func<IPCPacket, CancellationToken, Task> packetReceived = PacketReceived;
if (null == packetReceived)
{
continue;
Expand All @@ -71,7 +67,7 @@ public async Task Run(CancellationToken token)
Task[] handlerTasks = new Task[invocationList.Length];
for (int i = 0; i < invocationList.Length; i++)
{
handlerTasks[i] = ((Func<CancellationToken, IPCPacket, Task>)invocationList[i])(token, packet);
handlerTasks[i] = ((Func<IPCPacket, CancellationToken, Task>)invocationList[i])(packet, token);
}
await Task.WhenAll(handlerTasks);
}
Expand Down
14 changes: 3 additions & 11 deletions src/Test/L0/ProcessChannelL0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static async Task RunAsync(string[] args)
using (var client = new ProcessChannel())
{
SemaphoreSlim signal = new SemaphoreSlim(0, 1);
Func<CancellationToken, JobRequestMessage, Task> echoFunc = async (ct, message) =>
Func<JobRequestMessage, CancellationToken, Task> echoFunc = async (message, ct) =>
{
var cs2 = new CancellationTokenSource();
await client.SendAsync(message, cs2.Token);
Expand Down Expand Up @@ -50,7 +50,7 @@ public async Task RunIPCEndToEnd()
List<TaskInstance> tasks = new List<TaskInstance>();
Guid JobId = Guid.NewGuid();
var jobRequest = new JobRequestMessage(plan, timeline, JobId, "someJob", environment, tasks);
Func<CancellationToken, JobRequestMessage, Task> verifyFunc = (ct, message) =>
Func<JobRequestMessage, CancellationToken, Task> verifyFunc = (message, ct) =>
{
result = message;
signal.Release();
Expand All @@ -61,16 +61,8 @@ public async Task RunIPCEndToEnd()
server.StartServer((p1, p2) =>
{
string clientFileName = "Test";
bool hasExeSuffix = clientFileName.EndsWith(".exe", StringComparison.OrdinalIgnoreCase);
#if OS_WINDOWS
if (!hasExeSuffix)
{
clientFileName += ".exe";
}
#else
if (hasExeSuffix) {
clientFileName = clientFileName.Substring(0, clientFileName.Length - 4);
}
clientFileName += ".exe";
#endif
jobProcess = new Process();
jobProcess.StartInfo.FileName = clientFileName;
Expand Down

0 comments on commit ad8d1e7

Please sign in to comment.