diff --git a/README.md b/README.md index ba1aeb5..8b217eb 100644 --- a/README.md +++ b/README.md @@ -39,12 +39,13 @@ You can see that 1 slave is registered and you've got some idle CPUs and Memory. ### Run RENDLER in the `mesos-demo` VM Check implementations of the RENDLER scheduler in the `python`, `go`, -`scala`, and `cpp` directories. Run instructions are here: +`scala`, `cpp` and `csharp` directories. Run instructions are here: - [Python RENDLER framework](python/README.md) - [Go RENDLER framework](go/README.md) - [Scala RENDLER framework](scala/README.md) - [C++ RENDLER framework](cpp/README.md) +- [C# RENDLER framework](csharp/README.md) Feel free to contribute your own! diff --git a/csharp/.gitignore b/csharp/.gitignore new file mode 100644 index 0000000..7e8b49f --- /dev/null +++ b/csharp/.gitignore @@ -0,0 +1,10 @@ +*.suo +*.user +*.sdf +*.userprefs +.vs +bin +obj +*.so +*.nupkg +*.lock.json diff --git a/csharp/NuGet.Config b/csharp/NuGet.Config new file mode 100644 index 0000000..a58fa55 --- /dev/null +++ b/csharp/NuGet.Config @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/csharp/README.md b/csharp/README.md new file mode 100644 index 0000000..ccdb887 --- /dev/null +++ b/csharp/README.md @@ -0,0 +1,14 @@ +## C# Rendler Framework + +### Preparation + +1. The implementation uses the [mesos-clr](https://github.com/bcrusu/mesos-clr) library which needs to be build and placed to the `ext` directory before building the Rendler project. +2. Pack the Rendler binaries to `rendler.tar.gz` and place the archive inside the [frameworks_home](http://mesos.apache.org/documentation/latest/configuration/) directory. + +### Running + +To start the Rendler framework, run the command: +```bash +mono rendler.exe -scheduler -master=MASTER_ADDRESS -output=RENDLER_OUTPUT_DIR [-starturl=CRAWL_START_URL] [-user=RUN_AS_USER] +``` + diff --git a/csharp/Rendler.sln b/csharp/Rendler.sln new file mode 100644 index 0000000..aed6a02 --- /dev/null +++ b/csharp/Rendler.sln @@ -0,0 +1,32 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 14 +VisualStudioVersion = 14.0.25123.0 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{5D5A1D30-E747-47E8-8C4F-AAC48FAFA9C4}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{1B43699B-D07A-403E-BA95-8F1EB4E4A590}" + ProjectSection(SolutionItems) = preProject + global.json = global.json + EndProjectSection +EndProject +Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Rendler", "src\Rendler\Rendler.xproj", "{36A62F5A-1F76-494D-9377-2595AE03C598}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {36A62F5A-1F76-494D-9377-2595AE03C598}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {36A62F5A-1F76-494D-9377-2595AE03C598}.Debug|Any CPU.Build.0 = Debug|Any CPU + {36A62F5A-1F76-494D-9377-2595AE03C598}.Release|Any CPU.ActiveCfg = Release|Any CPU + {36A62F5A-1F76-494D-9377-2595AE03C598}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {36A62F5A-1F76-494D-9377-2595AE03C598} = {5D5A1D30-E747-47E8-8C4F-AAC48FAFA9C4} + EndGlobalSection +EndGlobal diff --git a/csharp/dev_nuget_feed/readme b/csharp/dev_nuget_feed/readme new file mode 100644 index 0000000..0f775f1 --- /dev/null +++ b/csharp/dev_nuget_feed/readme @@ -0,0 +1 @@ +place 'mesosclr.x.x.x.nupkg' in this directory diff --git a/csharp/global.json b/csharp/global.json new file mode 100644 index 0000000..b51e28b --- /dev/null +++ b/csharp/global.json @@ -0,0 +1,6 @@ +{ + "projects": [ "src", "test" ], + "sdk": { + "version": "1.0.0-preview1-002702" + } +} diff --git a/csharp/src/Rendler/DotHelper.cs b/csharp/src/Rendler/DotHelper.cs new file mode 100644 index 0000000..5835e7b --- /dev/null +++ b/csharp/src/Rendler/DotHelper.cs @@ -0,0 +1,65 @@ +using System.Collections.Generic; +using System.IO; +using System.Text; + +namespace Rendler +{ + internal static class DotHelper + { + public static void Write(string outputPath, IDictionary> nodeToChildNodes, + IDictionary nodeImageFileName) + { + var nodeNames = new Dictionary(); + var nodeIdCounter = 0; + + using (var fs = new FileStream(outputPath, FileMode.CreateNew, FileAccess.Write, FileShare.Write)) + using (var writer = new StreamWriter(fs, Encoding.UTF8)) + { + writer.WriteLine("digraph G {"); + writer.WriteLine("\tnode [shape=box];"); + + foreach (var node in nodeToChildNodes) + { + var url = node.Key; + var nodeName = "url_" + (++nodeIdCounter); + nodeNames[url] = nodeName; + + writer.Write("\t"); + writer.Write(nodeName); + + string imageFileName; + if (nodeImageFileName.TryGetValue(url, out imageFileName)) + { + writer.Write(" [label=\"\" image=\""); + writer.Write(imageFileName); + } + else + { + writer.Write(" [label=\""); + writer.Write(url); + } + + writer.WriteLine("\"];"); + } + + writer.WriteLine(); + + foreach (var node in nodeToChildNodes) + { + var nodeName = nodeNames[node.Key]; + foreach (var childNode in node.Value) + { + var childNodeName = nodeNames[childNode]; + writer.Write("\t"); + writer.Write(nodeName); + writer.Write(" -> "); + writer.Write(childNodeName); + writer.WriteLine(";"); + } + } + + writer.WriteLine("}"); + } + } + } +} diff --git a/csharp/src/Rendler/Executors/CrawlExecutor.cs b/csharp/src/Rendler/Executors/CrawlExecutor.cs new file mode 100644 index 0000000..d79bcd9 --- /dev/null +++ b/csharp/src/Rendler/Executors/CrawlExecutor.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using mesos; +using mesosclr; +using Rendler.Executors.Messages; +using System.Net.Http; + +namespace Rendler.Executors +{ + internal class CrawlExecutor : ExecutorBase + { + private static readonly Regex ExtractLinksRegex = new Regex("]+href=[\"']?(?[^\"'>]+)[\"']?[^>]*>(.+?)", RegexOptions.Compiled | RegexOptions.IgnoreCase); + + public override void Registered(IExecutorDriver driver, ExecutorInfo executorInfo, FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) + { + Console.WriteLine($"Registered executor on '{slaveInfo.hostname}'."); + } + + public override void LaunchTask(IExecutorDriver driver, TaskInfo taskInfo) + { + Console.WriteLine($"Launching crawl task '{taskInfo.task_id.value}'..."); + + Task.Factory.StartNew(async () => + { + try + { + await RunTask(driver, taskInfo); + } + catch (Exception e) + { + Console.WriteLine($"Exception during crawl operation: {e}"); + driver.SendTaskErrorStatus(taskInfo.task_id); + } + }); + } + + private static async Task RunTask(IExecutorDriver driver, TaskInfo taskInfo) + { + driver.SendTaskRunningStatus(taskInfo.task_id); + + var url = Encoding.UTF8.GetString(taskInfo.data); + + var htmlContent = await GetUrlContent(url); + if (htmlContent != null) + { + var links = ExtractLinks(htmlContent); + links = links + .Select(x => x.ToLower()) + .Distinct(StringComparer.CurrentCultureIgnoreCase); + + if (links.Any()) + SendCrawlResultMessage(driver, url, links.ToArray()); + } + + driver.SendTaskFinishedStatus(taskInfo.task_id); + } + + private static IEnumerable ExtractLinks(string htmlContent) + { + var match = ExtractLinksRegex.Match(htmlContent); + while (match.Success) + { + yield return match.Groups["link"].Value.Trim(); + match = match.NextMatch(); + } + } + + private static async Task GetUrlContent(string url) + { + using (var client = new HttpClient()) + { + client.DefaultRequestHeaders.Add("X-PoweredBy", "minions"); + + try + { + return await client.GetStringAsync(url); + } + catch (WebException e) + { + Console.WriteLine($"Error fetching url '{url}'; Error: {e}"); + return null; + } + } + } + + private static void SendCrawlResultMessage(IExecutorDriver driver, string url, string[] links) + { + var message = new Message + { + Type = "CrawlResult", + Body = JsonHelper.Serialize(new CrawlResultMessage + { + Url = url, + Links = links + }) + }; + + driver.SendFrameworkMessage(JsonHelper.Serialize(message)); + } + } +} diff --git a/csharp/src/Rendler/Executors/ExecutorBase.cs b/csharp/src/Rendler/Executors/ExecutorBase.cs new file mode 100644 index 0000000..9768d03 --- /dev/null +++ b/csharp/src/Rendler/Executors/ExecutorBase.cs @@ -0,0 +1,42 @@ +using System; +using mesos; +using mesosclr; + +namespace Rendler.Executors +{ + abstract class ExecutorBase : IExecutor + { + public virtual void Registered(IExecutorDriver driver, ExecutorInfo executorInfo, FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) + { + } + + public virtual void Reregistered(IExecutorDriver driver, SlaveInfo slaveInfo) + { + } + + public virtual void Disconnected(IExecutorDriver driver) + { + } + + public virtual void LaunchTask(IExecutorDriver driver, TaskInfo taskInfo) + { + } + + public virtual void KillTask(IExecutorDriver driver, TaskID taskId) + { + } + + public virtual void FrameworkMessage(IExecutorDriver driver, byte[] data) + { + } + + public virtual void Shutdown(IExecutorDriver driver) + { + } + + public virtual void Error(IExecutorDriver driver, string message) + { + Console.WriteLine($"Error: '{message}'."); + } + } +} diff --git a/csharp/src/Rendler/Executors/Messages/CrawlResultMessage.cs b/csharp/src/Rendler/Executors/Messages/CrawlResultMessage.cs new file mode 100644 index 0000000..b87601f --- /dev/null +++ b/csharp/src/Rendler/Executors/Messages/CrawlResultMessage.cs @@ -0,0 +1,14 @@ +using System.Runtime.Serialization; + +namespace Rendler.Executors.Messages +{ + [DataContract] + public class CrawlResultMessage + { + [DataMember] + public string Url { get; set; } + + [DataMember] + public string[] Links { get; set; } + } +} diff --git a/csharp/src/Rendler/Executors/Messages/Message.cs b/csharp/src/Rendler/Executors/Messages/Message.cs new file mode 100644 index 0000000..650fa80 --- /dev/null +++ b/csharp/src/Rendler/Executors/Messages/Message.cs @@ -0,0 +1,14 @@ +using System.Runtime.Serialization; + +namespace Rendler.Executors.Messages +{ + [DataContract] + internal class Message + { + [DataMember] + public string Type { get; set; } + + [DataMember] + public byte[] Body { get; set; } + } +} diff --git a/csharp/src/Rendler/Executors/Messages/RenderResultMessage.cs b/csharp/src/Rendler/Executors/Messages/RenderResultMessage.cs new file mode 100644 index 0000000..6e5f0fe --- /dev/null +++ b/csharp/src/Rendler/Executors/Messages/RenderResultMessage.cs @@ -0,0 +1,14 @@ +using System.Runtime.Serialization; + +namespace Rendler.Executors.Messages +{ + [DataContract] + public class RenderResultMessage + { + [DataMember] + public string Url { get; set; } + + [DataMember] + public string FileName { get; set; } + } +} diff --git a/csharp/src/Rendler/Executors/RenderExecutor.cs b/csharp/src/Rendler/Executors/RenderExecutor.cs new file mode 100644 index 0000000..dd9bfa5 --- /dev/null +++ b/csharp/src/Rendler/Executors/RenderExecutor.cs @@ -0,0 +1,77 @@ +using System; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using mesos; +using mesosclr; +using Rendler.Executors.Messages; + +namespace Rendler.Executors +{ + class RenderExecutor : ExecutorBase + { + private string _outputDir; + + public override void Registered(IExecutorDriver driver, ExecutorInfo executorInfo, FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) + { + _outputDir = Encoding.UTF8.GetString (executorInfo.data); + Console.WriteLine ($"Registered executor on host '{slaveInfo.hostname}'. Output dir is '{_outputDir}'."); + } + + public override void LaunchTask(IExecutorDriver driver, TaskInfo taskInfo) + { + Console.WriteLine($"Launching render task '{taskInfo.task_id.value}'..."); + + Task.Factory.StartNew (() => { + try { + RunTask (driver, taskInfo); + } catch (Exception e) { + Console.WriteLine ($"Exception during render operation: {e}"); + driver.SendTaskErrorStatus (taskInfo.task_id); + } + }); + } + + private void RunTask(IExecutorDriver driver, TaskInfo taskInfo) + { + driver.SendTaskRunningStatus(taskInfo.task_id); + + var url = Encoding.UTF8.GetString(taskInfo.data); + var imageFileName = RunRendering(taskInfo.task_id, url); + + SendRenderResultMessage(driver, url, imageFileName); + driver.SendTaskFinishedStatus(taskInfo.task_id); + } + + private string RunRendering(TaskID taskId, string url) + { + var imagePath = Path.Combine(_outputDir, $"{taskId.value}.png"); + + var startInfo = new ProcessStartInfo("phantomjs"); + startInfo.Arguments = $"render.js \"{url}\" \"{imagePath}\""; + startInfo.CreateNoWindow = true; + startInfo.UseShellExecute = false; + + var process = Process.Start(startInfo); + process.WaitForExit(); + + return imagePath; + } + + private static void SendRenderResultMessage(IExecutorDriver driver, string url, string fileName) + { + var message = new Message + { + Type = "RenderResult", + Body = JsonHelper.Serialize(new RenderResultMessage + { + Url = url, + FileName = fileName + }) + }; + + driver.SendFrameworkMessage(JsonHelper.Serialize(message)); + } + } +} diff --git a/csharp/src/Rendler/JsonHelper.cs b/csharp/src/Rendler/JsonHelper.cs new file mode 100644 index 0000000..9c6e93f --- /dev/null +++ b/csharp/src/Rendler/JsonHelper.cs @@ -0,0 +1,20 @@ +using Newtonsoft.Json; +using System.Text; + +namespace Rendler +{ + internal static class JsonHelper + { + public static byte[] Serialize(object obj) + { + var str = JsonConvert.SerializeObject(obj); + return Encoding.UTF8.GetBytes(str); + } + + public static T Deserialize(byte[] bytes) + { + var str = Encoding.UTF8.GetString(bytes); + return JsonConvert.DeserializeObject(str); + } + } +} diff --git a/csharp/src/Rendler/MesosExtensions.cs b/csharp/src/Rendler/MesosExtensions.cs new file mode 100644 index 0000000..de09c4b --- /dev/null +++ b/csharp/src/Rendler/MesosExtensions.cs @@ -0,0 +1,45 @@ +using mesos; +using mesosclr; + +namespace Rendler +{ + internal static class MesosExtensions + { + public static void SendTaskRunningStatus(this IExecutorDriver driver, TaskID taskId) + { + driver.SendStatusUpdate(new TaskStatus + { + task_id = taskId, + state = TaskState.TASK_RUNNING + }); + } + + public static void SendTaskFinishedStatus(this IExecutorDriver driver, TaskID taskId) + { + driver.SendStatusUpdate(new TaskStatus + { + task_id = taskId, + state = TaskState.TASK_FINISHED + }); + } + + public static void SendTaskErrorStatus(this IExecutorDriver driver, TaskID taskId) + { + driver.SendStatusUpdate(new TaskStatus + { + task_id = taskId, + state = TaskState.TASK_ERROR + }); + } + + public static bool IsTerminal(this TaskState state) + { + return state == TaskState.TASK_FINISHED || + state == TaskState.TASK_FAILED || + state == TaskState.TASK_KILLED || + state == TaskState.TASK_LOST || + state == TaskState.TASK_ERROR || + state == TaskState.TASK_KILLING; + } + } +} diff --git a/csharp/src/Rendler/Program.cs b/csharp/src/Rendler/Program.cs new file mode 100644 index 0000000..fcb9614 --- /dev/null +++ b/csharp/src/Rendler/Program.cs @@ -0,0 +1,71 @@ +using System; +using mesos; +using mesosclr; +using Rendler.Executors; + +namespace Rendler +{ + class Program + { + static int Main(string[] args) + { + var arguments = Arguments.Parse (args); + if (arguments == null || !arguments.Validate ()) + return -1; + + switch (arguments.RunMode) { + case RunMode.Scheduler: + return RunScheduler (arguments.MesosMaster, arguments.StartUrl, arguments.OutputDir, arguments.RunAsUser); + case RunMode.Executor: + return RunExecutor (arguments.ExecutorName); + default: + return -1; + } + } + + private static int RunScheduler(string mesosMaster, string startUrl, string outputDir, string runAsUser) + { + var frameworkInfo = new FrameworkInfo { + id = new FrameworkID { + value = "Rendler" + }, + name = "Rendler (C#)", + failover_timeout = 5, //seconds + checkpoint = false, + user = runAsUser + }; + + var scheduler = new RendlerScheduler(startUrl ?? "https://mesosphere.com", outputDir, runAsUser); + var driver = new MesosSchedulerDriver(scheduler, frameworkInfo, mesosMaster); + + Console.WriteLine ("Running driver..."); + var result = driver.Run() == Status.DRIVER_STOPPED ? 0 : 1; + Console.WriteLine ($"Driver finished with status {result}."); + + return result; + } + + private static int RunExecutor(string executorName) + { + IExecutor executor; + + switch (executorName) + { + case "render": + executor = new RenderExecutor(); + break; + case "crawl": + executor = new CrawlExecutor(); + break; + default: + { + Console.WriteLine($"Invlaid executor provided: '{executorName}'."); + return -2; + } + } + + var driver = new MesosExecutorDriver(executor); + return driver.Run() == Status.DRIVER_STOPPED ? 0 : 1; + } + } +} diff --git a/csharp/src/Rendler/ProgramArguments.cs b/csharp/src/Rendler/ProgramArguments.cs new file mode 100644 index 0000000..f21334c --- /dev/null +++ b/csharp/src/Rendler/ProgramArguments.cs @@ -0,0 +1,144 @@ +using System; +using System.IO; + +namespace Rendler +{ + internal class Arguments + { + public RunMode RunMode { get; private set; } + + public string MesosMaster { get; private set; } + + public string ExecutorName { get; private set; } + + public string StartUrl { get; private set; } + + public string OutputDir { get; private set; } + + public string RunAsUser { get; private set; } + + public static Arguments Parse(string[] args) + { + var runMode = RunMode.Default; + string mesosMaster = null; + string executor = null; + string outputDir = null; + string startUrl = null; + string runAsUser = null; + + foreach (var arg in args) + { + if (arg.StartsWith("-executor=")) + { + if (runMode == RunMode.Executor) { + Console.WriteLine("Executor option can be specified only once."); + return null; + } + if (runMode == RunMode.Scheduler) + { + Console.WriteLine("Scheduler and Executor run modes are mutually exclusive."); + return null; + } + + executor = arg.Substring("-executor=".Length); + runMode = RunMode.Executor; + } + else if (arg.Equals("-scheduler")) + { + if (runMode == RunMode.Scheduler) { + Console.WriteLine("Scheduler option can be specified only once."); + return null; + } + if (runMode == RunMode.Executor) + { + Console.WriteLine("Scheduler and Executor run modes are mutually exclusive."); + return null; + } + + runMode = RunMode.Scheduler; + } + else if (arg.StartsWith("-master=")) + { + if (mesosMaster != null) { + Console.WriteLine("Mesos master option can be specified only once."); + return null; + } + + mesosMaster = arg.Substring("-master=".Length); + } + else if (arg.StartsWith("-output=")) + { + if (outputDir != null) { + Console.WriteLine("Output directory option can be specified only once."); + return null; + } + + outputDir = arg.Substring("-output=".Length); + } + else if (arg.StartsWith("-starturl=")) + { + if (startUrl != null) { + Console.WriteLine("Start URL option can be specified only once."); + return null; + } + + startUrl = arg.Substring("-starturl=".Length); + } + else if (arg.StartsWith("-user=")) + { + if (startUrl != null) { + Console.WriteLine("User option can be specified only once."); + return null; + } + + runAsUser = arg.Substring("-user=".Length); + } + else + { + Console.WriteLine($"Unknown argument detected: '{arg}'."); + } + } + + return new Arguments + { + RunMode = runMode, + ExecutorName = executor, + MesosMaster = mesosMaster, + OutputDir = outputDir, + StartUrl = startUrl, + RunAsUser = runAsUser + }; + } + + public bool Validate () + { + switch (RunMode) { + case RunMode.Executor: + if (string.IsNullOrWhiteSpace (ExecutorName)) { + Console.WriteLine ("Invalid executor name."); + return false; + } + break; + case RunMode.Scheduler: + if (string.IsNullOrWhiteSpace (MesosMaster)) { + Console.WriteLine ("Invalid Mesos master address."); + return false; + } + if (string.IsNullOrWhiteSpace (OutputDir)) { + Console.WriteLine ("Invalid output directory."); + return false; + } + if (!Directory.Exists(OutputDir)){ + Console.WriteLine ("Output directory does not exist."); + return false; + } + break; + default: + Console.WriteLine ("Run mode was not specified."); + return false; + } + + return true; + } + } +} diff --git a/csharp/src/Rendler/Properties/AssemblyInfo.cs b/csharp/src/Rendler/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..ad431e6 --- /dev/null +++ b/csharp/src/Rendler/Properties/AssemblyInfo.cs @@ -0,0 +1,16 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("mesosclr.Rendler")] +[assembly: AssemblyDescription("")] +[assembly: ComVisible(false)] + +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("mesosclr.Rendler")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/csharp/src/Rendler/Rendler.xproj b/csharp/src/Rendler/Rendler.xproj new file mode 100644 index 0000000..cca238a --- /dev/null +++ b/csharp/src/Rendler/Rendler.xproj @@ -0,0 +1,19 @@ + + + + 14.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + + + + 36a62f5a-1f76-494d-9377-2595ae03c598 + Rendler + .\obj + .\bin\ + v4.6.1 + + + 2.0 + + + \ No newline at end of file diff --git a/csharp/src/Rendler/RendlerScheduler.cs b/csharp/src/Rendler/RendlerScheduler.cs new file mode 100644 index 0000000..36f634f --- /dev/null +++ b/csharp/src/Rendler/RendlerScheduler.cs @@ -0,0 +1,291 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using mesos; +using mesosclr; +using Rendler.Executors.Messages; + +namespace Rendler +{ + internal class RendlerScheduler : IScheduler + { + private const int MaxTasksToRun = 256; // limit for demonstration purpose + + private const double RenderCpus = 1d; + private const double RenderMem = 128d; + private const double CrawlCpus = 0.5d; + private const double CrawlMem = 64d; + + private readonly string _outputDir; + private readonly string _runAsUser; + + + private int _launchedTasks; + private int _finishedTasksCount; + private readonly ConcurrentQueue _crawlQueue = new ConcurrentQueue(); + private readonly ConcurrentQueue _renderQueue = new ConcurrentQueue(); + private readonly ISet _crawled = new HashSet(); + + private readonly ConcurrentDictionary _renderResults = new ConcurrentDictionary(); + private readonly ConcurrentDictionary> _crawlResults = new ConcurrentDictionary>(); + + public RendlerScheduler(string startUrl, string outputDir, + string runAsUser = null) + { + if (startUrl == null) throw new ArgumentNullException(nameof(startUrl)); + if (outputDir == null) throw new ArgumentNullException(nameof(outputDir)); + _outputDir = outputDir; + _runAsUser = runAsUser; + + _crawlQueue.Enqueue(startUrl); + _renderQueue.Enqueue(startUrl); + } + + public void Registered(ISchedulerDriver driver, FrameworkID frameworkId, MasterInfo masterInfo) + { + Console.WriteLine($"Registered with Mesos master. FrameworkId='{frameworkId.value}'."); + } + + public void Reregistered(ISchedulerDriver driver, MasterInfo masterInfo) + { + } + + public void ResourceOffers(ISchedulerDriver driver, IEnumerable offers) + { + foreach (var offer in offers) + { + var tasks = new List(); + var resourcesCounter = new ResourcesCounter(offer); + bool done; + do + { + done = true; + + string renderUrl; + if (resourcesCounter.HasRenderTaskResources() && _renderQueue.TryDequeue(out renderUrl)) + { + tasks.Add(GetRenderTaskInfo(offer, ++_launchedTasks, renderUrl)); + resourcesCounter.SubstractRenderResources(); + done = false; + } + + string crawlUrl; + if (resourcesCounter.HasCrawlTaskResources() && _crawlQueue.TryDequeue(out crawlUrl)) + { + tasks.Add(GetCrawlTaskInfo(offer, ++_launchedTasks, crawlUrl)); + resourcesCounter.SubstractCrawlResources(); + _crawled.Add(crawlUrl); + done = false; + } + } while (!done); + + if (tasks.Any ()) { + driver.LaunchTasks (new[] { offer.id }, tasks); + } + else + driver.DeclineOffer(offer.id); + } + } + + public void OfferRescinded(ISchedulerDriver driver, OfferID offerId) + { + } + + public void StatusUpdate(ISchedulerDriver driver, TaskStatus status) + { + if (status.state.IsTerminal()) + { + Console.WriteLine($"Status update: task '{status.task_id.value}' has terminated with state '{status.state}'."); + var finishedTasksCount = Interlocked.Increment(ref _finishedTasksCount); + + if (finishedTasksCount == MaxTasksToRun) + { + Console.WriteLine("Reached the max number of tasks to run. Stopping..."); + + var dotWritePath = Path.Combine(_outputDir, "result.dot"); + DotHelper.Write(dotWritePath, _crawlResults, _renderResults); + driver.Stop(); + } + } + else + { + Console.WriteLine($"Status update: task '{status.task_id.value}' is in state '{status.state}'."); + } + } + + public void FrameworkMessage(ISchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, byte[] data) + { + var message = JsonHelper.Deserialize (data); + switch (message.Type) { + case "CrawlResult": + var crawlResult = JsonHelper.Deserialize (message.Body); + Console.WriteLine ($"Framework message : got {crawlResult.Links.Length} links from url '{crawlResult.Url}'."); + + foreach (var link in crawlResult.Links) { + if (_crawled.Contains (link)) + continue; + + _crawlQueue.Enqueue (link); + _renderQueue.Enqueue (link); + } + + // update edges: url -> links + var edges = _crawlResults.GetOrAdd (crawlResult.Url, x => new List ()); + edges.AddRange (crawlResult.Links); + + // empty edge list for links + foreach (var link in crawlResult.Links) + _crawlResults.GetOrAdd (link, x => new List ()); + break; + case "RenderResult": + var renderResult = JsonHelper.Deserialize (message.Body); + Console.WriteLine ($"Framework message : saved '{renderResult.FileName}' for url '{renderResult.Url}'."); + + _renderResults [renderResult.Url] = renderResult.FileName; + break; + default: + Console.WriteLine ($"Unrecognized message type: '{message.Type}'"); + break; + } + } + + public void Disconnected(ISchedulerDriver driver) + { + } + + public void SlaveLost(ISchedulerDriver driver, SlaveID slaveId) + { + } + + public void ExecutorLost(ISchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, int status) + { + } + + public void Error(ISchedulerDriver driver, string message) + { + Console.WriteLine($"Error: '{message}'."); + } + + private TaskInfo GetRenderTaskInfo(Offer offer, int uniqueId, string url) + { + var result = new TaskInfo { + name = "Rendler.Render_" + uniqueId, + task_id = new TaskID { value = uniqueId.ToString () }, + slave_id = offer.slave_id, + resources = { + new Resource { + name = "cpus", + type = Value.Type.SCALAR, + scalar = new Value.Scalar { value = RenderCpus } + }, + new Resource { + name = "mem", + type = Value.Type.SCALAR, + scalar = new Value.Scalar { value = RenderMem } + } + }, + executor = new ExecutorInfo { + executor_id = new ExecutorID { value = "RenderExecutor" }, + command = new CommandInfo { + value = "mono rendler.exe -executor=render", + user = _runAsUser + }, + data = Encoding.UTF8.GetBytes (_outputDir) + }, + data = Encoding.UTF8.GetBytes (url) + }; + + result.executor.command.uris.Add (new CommandInfo.URI { + cache = false, + extract = true, + value = "./rendler.tar.gz", // relative to "frameworks_home" mesos-slave command argument + executable = false + }); + + return result; + } + + private TaskInfo GetCrawlTaskInfo(Offer offer, int uniqueId, string url) + { + var result = new TaskInfo { + name = "Rendler.Crawl_" + uniqueId, + task_id = new TaskID { value = uniqueId.ToString () }, + slave_id = offer.slave_id, + resources = { + new Resource { + name = "cpus", + type = Value.Type.SCALAR, + scalar = new Value.Scalar { value = CrawlCpus } + }, + new Resource { name = "mem", type = Value.Type.SCALAR, scalar = new Value.Scalar { value = CrawlMem } } + }, + executor = new ExecutorInfo { + executor_id = new ExecutorID { value = "CrawlExecutor" }, + command = new CommandInfo { + value = "mono rendler.exe -executor=crawl", + user = _runAsUser + }, + }, + data = Encoding.UTF8.GetBytes (url) + }; + + result.executor.command.uris.Add (new CommandInfo.URI { + cache = false, + extract = true, + value = "./rendler.tar.gz", // relative to "frameworks_home" mesos-slave command argument + executable = false + }); + + return result; + } + + private class ResourcesCounter + { + private double _cpus; + private double _mem; + + public ResourcesCounter(Offer offer) + { + var cpusResource = offer.resources.SingleOrDefault(x => x.name == "cpus"); + var memResource = offer.resources.SingleOrDefault(x => x.name == "mem"); + _cpus = cpusResource?.scalar.value ?? 0d; + _mem = memResource?.scalar.value ?? 0d; + } + + private void Substract(double cpus, double mem) + { + _cpus = _cpus - cpus; + _mem = _mem - mem; + } + + public bool HasRenderTaskResources() + { + return HasResources(RenderCpus, RenderMem); + } + + public bool HasCrawlTaskResources() + { + return HasResources(CrawlCpus, CrawlMem); + } + + public void SubstractRenderResources() + { + Substract(RenderCpus, RenderMem); + } + + public void SubstractCrawlResources() + { + Substract(CrawlCpus, CrawlMem); + } + + private bool HasResources(double cpus, double mem) + { + return _cpus >= cpus && _mem >= mem; + } + } + } +} diff --git a/csharp/src/Rendler/RunMode.cs b/csharp/src/Rendler/RunMode.cs new file mode 100644 index 0000000..d7fdc2c --- /dev/null +++ b/csharp/src/Rendler/RunMode.cs @@ -0,0 +1,9 @@ +namespace Rendler +{ + internal enum RunMode + { + Default, + Scheduler, + Executor + } +} diff --git a/csharp/src/Rendler/project.json b/csharp/src/Rendler/project.json new file mode 100644 index 0000000..381c288 --- /dev/null +++ b/csharp/src/Rendler/project.json @@ -0,0 +1,23 @@ +{ + "version": "1.0.0-*", + "buildOptions": { + "emitEntryPoint": true, + "copyToOutput": "libmesosclr.so" + }, + + "dependencies": { + "mesosclr": "1.0.0", + "Microsoft.NETCore.App": { + "type": "platform", + "version": "1.0.0-rc2-3002702" + }, + "Newtonsoft.Json": "8.0.3", + "System.Net.Http": "4.0.1-rc2-24027" + }, + + "frameworks": { + "netcoreapp1.0": { + "imports": "dnxcore50" + } + } +} diff --git a/nodejs/.gitignore b/nodejs/.gitignore new file mode 100644 index 0000000..b3ff1bb --- /dev/null +++ b/nodejs/.gitignore @@ -0,0 +1,4 @@ +/.idea +/node_modules +/support/work +*.iml diff --git a/nodejs/README.md b/nodejs/README.md new file mode 100644 index 0000000..f5622e8 --- /dev/null +++ b/nodejs/README.md @@ -0,0 +1,14 @@ +## Node.js Rendler Framework + +### Preparation + +1. The implementation uses the [mesos-api](https://github.com/bcrusu/mesos-node) package, which has to be linked manually using the [npm-link](https://docs.npmjs.com/cli/link) command (npmjs.org package to come). +2. Pack the Rendler binaries to `rendler.tar.gz` and place the archive inside the [frameworks_home](http://mesos.apache.org/documentation/latest/configuration/) directory. + +### Running + +To start the Rendler framework, run the following command: +```bash +node index.js -scheduler -master=MASTER_ADDRESS -output=RENDLER_OUTPUT_DIR [-starturl=CRAWL_START_URL] [-user=RUN_AS_USER] +``` + diff --git a/nodejs/index.js b/nodejs/index.js new file mode 100644 index 0000000..738f423 --- /dev/null +++ b/nodejs/index.js @@ -0,0 +1,2 @@ +var rendler = require("./src/rendler"); +rendler.main(); \ No newline at end of file diff --git a/nodejs/package.json b/nodejs/package.json new file mode 100644 index 0000000..3e6f7ee --- /dev/null +++ b/nodejs/package.json @@ -0,0 +1,9 @@ +{ + "name": "rendler", + "version": "0.0.1", + "private": true, + "dependencies": { + "mesos-api": ">0.0.0", + "bytebuffer": "~4 >=4.1" + } +} \ No newline at end of file diff --git a/nodejs/render.js b/nodejs/render.js new file mode 100644 index 0000000..6d9fe57 --- /dev/null +++ b/nodejs/render.js @@ -0,0 +1,23 @@ +var page = require('webpage').create(), + system = require('system'), + address, output, size; + +if (system.args.length < 3) { + console.log('Usage: render.js '); + phantom.exit(1); +} + +address = system.args[1]; +destination = system.args[2]; + +console.log('Rendering ' + address + ' to ' + destination); + +page.viewportSize = { + width: 1024, + height: 768 +}; + +page.open(address, function() { + page.render(destination); + phantom.exit(); +}); diff --git a/nodejs/src/arguments.js b/nodejs/src/arguments.js new file mode 100644 index 0000000..ce6bf3f --- /dev/null +++ b/nodejs/src/arguments.js @@ -0,0 +1,120 @@ +function parse(argsArray) { + var runMode = undefined; + var mesosMaster = undefined; + var executor = undefined; + var outputDir = undefined; + var startUrl = undefined; + var runAsUser = undefined; + + argsArray.forEach(function (arg) { + if (arg.indexOf("-executor=") === 0) { + if (runMode !== undefined) { + console.log("Invalid run mode detected. Check the 'executor' argument!"); + return null; + } + + executor = arg.substring("-executor=".length); + runMode = "executor"; + } + else if (arg.indexOf("-scheduler") === 0) { + if (runMode !== undefined) { + console.log("Invalid run mode detected. Check the 'scheduler' argument!"); + return null; + } + + runMode = "scheduler"; + } + else if (arg.indexOf("-master=") === 0) { + if (mesosMaster !== undefined) { + console.log("Mesos master option can be specified only once."); + return null; + } + + mesosMaster = arg.substring("-master=".length); + } + else if (arg.indexOf("-output=") === 0) { + if (outputDir !== undefined) { + console.log("Output directory option can be specified only once."); + return null; + } + + outputDir = arg.substring("-output=".length); + } + else if (arg.indexOf("-starturl=") === 0) { + if (startUrl !== undefined) { + console.log("Start URL option can be specified only once."); + return null; + } + + startUrl = arg.substring("-starturl=".length); + } + else if (arg.indexOf("-user=") === 0) { + if (runAsUser !== undefined) { + console.log("User option can be specified only once."); + return null; + } + + runAsUser = arg.substring("-user=".length); + } + else { + console.log("Unknown argument detected: " + arg); + } + }); + + return { + runMode: runMode, + mesosMaster: mesosMaster, + executorName: executor, + outputDir: outputDir, + startUrl: startUrl, + runAsUser: runAsUser + } +} + +function validate(arguments) { + switch (arguments.runMode) { + case "executor": + if (!arguments.executorName) { + console.log("Invalid executor name."); + return false; + } + break; + case "scheduler" : + if (!arguments.mesosMaster) { + console.log("Invalid Mesos master address."); + return false; + } + if (!arguments.outputDir) { + console.log("Invalid output directory."); + return false; + } + + if (!directoryExists(arguments.outputDir)) { + console.log("Could not find output directory."); + return false; + } + break; + default : + console.log("Run mode was not specified."); + return false; + } + return true; +} + +function directoryExists(directory) { + var fs = require("fs"); + try { + var stats = fs.statSync(directory); + if (!stats.isDirectory()) { + return false; + } + } + catch (e) { + return false; + } + + return true; +} + +module.exports.parse = parse; +module.exports.validate = validate; \ No newline at end of file diff --git a/nodejs/src/dotUtil.js b/nodejs/src/dotUtil.js new file mode 100644 index 0000000..c764d45 --- /dev/null +++ b/nodejs/src/dotUtil.js @@ -0,0 +1,61 @@ +const fs = require('fs'); +const endOfLine = require('os').EOL; + +function write(outputPath, nodeToChildNodes, nodeImageFileName) { + var url, nodeName; + var nodeNames = {}; + var nodeIdCounter = 0; + + var stream = fs.createWriteStream(outputPath, { + flags: 'w', + defaultEncoding: 'utf8' + }); + + stream.write("digraph G {"); + stream.write(endOfLine); + stream.write("\tnode [shape=box];"); + stream.write(endOfLine); + + for (url in nodeToChildNodes) { + nodeName = "url_" + (++nodeIdCounter); + nodeNames[url] = nodeName; + + stream.write("\t"); + stream.write(nodeName); + + var imageFileName = nodeImageFileName[url]; + if (imageFileName) { + stream.write(" [label=\"\" image=\""); + stream.write(imageFileName); + } + else { + stream.write(" [label=\""); + stream.write(url); + } + + stream.write("\"];"); + stream.write(endOfLine); + } + + stream.write(endOfLine); + + for (url in nodeToChildNodes) { + nodeName = nodeNames[url]; + var childNodes = nodeToChildNodes[url]; + for (var i = 0; i < childNodes.length; i++) { + var childNode = childNodes[i]; + var childNodeName = nodeNames[childNode]; + stream.write("\t"); + stream.write(nodeName); + stream.write(" -> "); + stream.write(childNodeName); + stream.write(";"); + stream.write(endOfLine); + } + } + + stream.write("}"); + stream.end(endOfLine); +} + +module.exports.write = write; \ No newline at end of file diff --git a/nodejs/src/executors/crawlExecutor.js b/nodejs/src/executors/crawlExecutor.js new file mode 100644 index 0000000..e8049d7 --- /dev/null +++ b/nodejs/src/executors/crawlExecutor.js @@ -0,0 +1,103 @@ +const util = require('util'); +const http = require('http'); +const https = require('https'); +const urlLib = require('url'); +const EventEmitter = require('events'); +const executorUtil = require('./executorUtil'); + +function CrawlExecutor() { + EventEmitter.call(this); + + function onRegistered(driver, executorInfo, frameworkInfo, slaveInfo) { + console.log("Registered executor on host " + slaveInfo.hostname); + } + + function onLaunchTask(driver, taskInfo) { + console.log("Launching crawl task '" + taskInfo.task_id.value + "'..."); + executorUtil.sendTaskRunningStatus(driver, taskInfo.task_id); + + var url = taskInfo.data.toUTF8(); + + var httpx = getHttpObject(url); + if (!httpx) { + console.log("Unrecognized protocol for url: " + url); + executorUtil.sendTaskFinishedStatus(driver, taskInfo.task_id); + return; + } + + httpx.get(url, function (response) { + var links = []; + response.setEncoding('utf8'); + + response.on('data', function (chunk) { + links = links.concat(parseLinks(chunk)); + }); + response.on('end', function () { + sendCrawlResultMessage(driver, url, links); + executorUtil.sendTaskFinishedStatus(driver, taskInfo.task_id); + }); + }).on('error', function (error) { + console.log("Error during crawl operation: " + error); + executorUtil.sendTaskErrorStatus(driver, taskInfo.task_id); + }); + } + + function onError(driver, message) { + console.log("Error: " + message); + } + + function sendCrawlResultMessage(driver, url, links) { + var message = { + type: "CrawlResult", + body: { + url: url, + links: links + } + }; + + driver.sendFrameworkMessage(JSON.stringify(message)); + } + + function getHttpObject(url) { + var parsed = urlLib.parse(url); + switch (parsed.protocol) { + case "https:": + return https; + case "http:": + return http; + default : + return null; + } + } + + function getIsValidLink(link) { + try { + var protocol = urlLib.parse(link).protocol; + return protocol === "https:" || protocol === "http:"; + } + catch (e) { + return false; + } + } + + function parseLinks(content) { + var regex = /]+href=[\"']?([^\"\'>]+)[\"\']?[^>]*>.+?<\/a>/gi; + var links = []; + var array; + while ((array = regex.exec(content)) !== null) { + var link = array[1]; + if (getIsValidLink(link)) + links.push(link.toLowerCase()); + } + + return links; + } + + this.on("registered", onRegistered); + this.on("launchTask", onLaunchTask); + this.on("error", onError); +} + +util.inherits(CrawlExecutor, EventEmitter); + +module.exports = CrawlExecutor; \ No newline at end of file diff --git a/nodejs/src/executors/executorUtil.js b/nodejs/src/executors/executorUtil.js new file mode 100644 index 0000000..b1bf0b5 --- /dev/null +++ b/nodejs/src/executors/executorUtil.js @@ -0,0 +1,23 @@ +const MesosApi = require('mesos-api'); +const Protos = MesosApi.protos.mesos; + +exports.sendTaskRunningStatus = function (driver, taskId) { + driver.sendStatusUpdate(new Protos.TaskStatus({ + task_id: taskId, + state: Protos.TaskState.TASK_RUNNING + })); +}; + +exports.sendTaskFinishedStatus = function (driver, taskId) { + driver.sendStatusUpdate(new Protos.TaskStatus({ + task_id: taskId, + state: Protos.TaskState.TASK_FINISHED + })); +}; + +exports.sendTaskErrorStatus = function (driver, taskId) { + driver.sendStatusUpdate(new Protos.TaskStatus({ + task_id: taskId, + state: Protos.TaskState.TASK_ERROR + })); +}; \ No newline at end of file diff --git a/nodejs/src/executors/renderExecutor.js b/nodejs/src/executors/renderExecutor.js new file mode 100644 index 0000000..6a7c893 --- /dev/null +++ b/nodejs/src/executors/renderExecutor.js @@ -0,0 +1,61 @@ +const util = require('util'); +const path = require('path'); +const child_process = require('child_process'); +const EventEmitter = require('events'); +const executorUtil = require('./executorUtil'); + +function RenderExecutor() { + EventEmitter.call(this); + + var _outputDir; + + function onRegistered(driver, executorInfo, frameworkInfo, slaveInfo) { + _outputDir = executorInfo.data.toUTF8(); + console.log("Registered executor on host " + slaveInfo.hostname + ". Output dir is '" + _outputDir + "'."); + } + + function onLaunchTask(driver, taskInfo) { + console.log("Launching render task '" + taskInfo.task_id.value + "'..."); + executorUtil.sendTaskRunningStatus(driver, taskInfo.task_id); + + var url = taskInfo.data.toUTF8(); + var fileName = path.join(_outputDir, taskInfo.task_id.value + ".png"); + var spawnOptions = { + timeout: 1000 * 30 + }; + + child_process.spawn("phantomjs", ["render.js", url, fileName], spawnOptions) + .on('close', function () { + sendRenderResultMessage(driver, url, fileName); + executorUtil.sendTaskFinishedStatus(driver, taskInfo.task_id); + }) + .on('error', function (err) { + console.log("Error during render operation: " + err); + executorUtil.sendTaskErrorStatus(driver, taskInfo.task_id); + }); + } + + function onError(driver, message) { + console.log("Error: " + message); + } + + function sendRenderResultMessage(driver, url, fileName) { + var message = { + type: "RenderResult", + body: { + url: url, + fileName: fileName + } + }; + + driver.sendFrameworkMessage(JSON.stringify(message)); + } + + this.on("registered", onRegistered); + this.on("launchTask", onLaunchTask); + this.on("error", onError); +} + +util.inherits(RenderExecutor, EventEmitter); + +module.exports = RenderExecutor; \ No newline at end of file diff --git a/nodejs/src/rendler.js b/nodejs/src/rendler.js new file mode 100644 index 0000000..29353a8 --- /dev/null +++ b/nodejs/src/rendler.js @@ -0,0 +1,82 @@ +var Arguments = require("./arguments"); +const MesosApi = require("mesos-api"); +const Protos = MesosApi.protos.mesos; +const RendlerScheduler = require("./rendlerScheduler"); +const CrawlExecutor = require("./executors/crawlExecutor"); +const RenderExecutor = require("./executors/renderExecutor"); + +function main() { + var args = Arguments.parse(process.argv.slice(2)); + if (!args || !Arguments.validate(args)) + process.exit(-1); + + switch (args.runMode) { + case "executor": + runExecutor(args.executorName); + break; + case "scheduler" : + runScheduler(args.mesosMaster, args.startUrl, args.outputDir, args.runAsUser); + break; + } +} + +function runScheduler(mesosMaster, startUrl, outputDir, runAsUser) { + var frameworkInfo = new Protos.FrameworkInfo({ + id: { + value: "Rendler" + }, + name: "Rendler (Node.js)", + failover_timeout: 5, //seconds + checkpoint: false, + user: runAsUser + }); + + if (!startUrl) + startUrl = "https://mesosphere.com"; + + var scheduler = new RendlerScheduler(startUrl, outputDir, runAsUser); + var driver = MesosApi.createSchedulerDriver(scheduler, frameworkInfo, mesosMaster); + + console.log("Running scheduler driver..."); + driver.run() + .then(function (status) { + console.log("Scheduler driver finished with status: " + status); + process.exit(0); + }) + .catch(function (error) { + console.log("Unexpected driver error: " + error); + process.exit(-2); + }); +} + +function runExecutor(executorName) { + var executor; + switch (executorName) { + case "render": + executor = new RenderExecutor(); + break; + case "crawl": + executor = new CrawlExecutor(); + break; + default: + { + console.log("Unrecognized executor: " + executorName); + process.exit(-1); + } + } + + var driver = MesosApi.createExecutorDriver(executor); + + console.log("Running executor driver..."); + driver.run() + .then(function (status) { + console.log("Executor driver finished with status: " + status); + process.exit(0); + }) + .catch(function (error) { + console.log("Unexpected driver error: " + error); + process.exit(-3); + }); +} + +module.exports.main = main; \ No newline at end of file diff --git a/nodejs/src/rendlerScheduler.js b/nodejs/src/rendlerScheduler.js new file mode 100644 index 0000000..ac4fe3a --- /dev/null +++ b/nodejs/src/rendlerScheduler.js @@ -0,0 +1,266 @@ +const util = require('util'); +const path = require('path'); +const MesosApi = require('mesos-api'); +const Protos = MesosApi.protos.mesos; +const EventEmitter = require('events'); +const ByteBuffer = require('bytebuffer'); +const dotUtil = require('./dotUtil'); + +const MaxTasksToRun = 256; // limit for demonstration purpose +const RenderCpus = 1; +const RenderMem = 128; +const CrawlCpus = 0.5; +const CrawlMem = 64; + +function RendlerScheduler(startUrl, outputDir, runAsUser) { + EventEmitter.call(this); + + var _renderQueue = [startUrl]; + var _crawlQueue = [startUrl]; + var _crawled = []; + var _launchedTasks = 0; + var _finishedTasksCount = 0; + + var _renderResults = {}; + var _crawlResults = {}; + + function onRegistered(driver, frameworkId, masterInfo) { + console.log("Registered with Mesos master. FrameworkId=" + frameworkId.value); + } + + function onResourceOffers(driver, offers) { + for (var i = 0; i < offers.length; i++) { + var offer = offers[i]; + var tasks = []; + var resourcesCounter = new ResourcesCounter(offer); + var done; + do + { + done = true; + + var renderUrl = _renderQueue.pop(); + if (renderUrl && resourcesCounter.hasRenderTaskResources()) { + tasks.push(getRenderTaskInfo(offer, ++_launchedTasks, renderUrl)); + resourcesCounter.subtractRenderResources(); + done = false; + } + + var crawlUrl = _crawlQueue.pop(); + if (crawlUrl && resourcesCounter.hasCrawlTaskResources()) { + tasks.push(getCrawlTaskInfo(offer, ++_launchedTasks, crawlUrl)); + resourcesCounter.subtractCrawlResources(); + _crawled.push(crawlUrl); + done = false; + } + } while (!done); + + if (tasks.length > 0) { + driver.launchTasks([offer.id], tasks); + } + else + driver.declineOffer(offer.id); + } + } + + function onStatusUpdate(driver, status) { + if (!isTerminalTaskState(status.state)) { + console.log("Status update: task " + status.task_id.value + " is in state " + status.state); + return; + } + + console.log("Status update: task " + status.task_id.value + " has terminated with state " + status.state); + + if (++_finishedTasksCount == MaxTasksToRun) { + console.log("Reached the max number of tasks to run. Stopping..."); + + var dotWritePath = path.join(outputDir, "result.dot"); + dotUtil.write(dotWritePath, _crawlResults, _renderResults); + + driver.stop(); + } + } + + function onFrameworkMessage(driver, executorId, slaveId, data) { + var message = JSON.parse(data); + var url = message.body.url; + + switch (message.type) { + case "CrawlResult": + var links = message.body.links; + console.log("Framework message 'CrawlResult': got " + links.length + " links from url " + url); + + links + .filter(function (link) { + return !_crawled.some(function (crawledLink) { + return crawledLink === link; + }); + }) + .forEach(function (link) { + _crawlQueue.push(link); + _renderQueue.push(link); + }); + + // update edges: url -> links + var edges = _crawlResults[url] || []; + _crawlResults[url] = edges.concat(links); + + // empty edge list for links + links.forEach(function (link) { + _crawlResults[link] = _crawlResults[link] || []; + }); + break; + case "RenderResult": + var fileName = message.body.fileName; + console.log("Framework message 'RenderResult': saved " + fileName + " for url " + url); + + _renderResults[url] = fileName; + break; + default: + console.log("Unrecognized message type: " + message.type); + break; + } + } + + function onError(driver, message) { + console.log("Error: " + message); + } + + function getRenderTaskInfo(offer, uniqueId, url) { + return new Protos.TaskInfo({ + name: "Rendler.Render_" + uniqueId, + task_id: new Protos.TaskID({value: uniqueId.toString()}), + slave_id: offer.slave_id, + resources: [ + new Protos.Resource({ + name: "cpus", + type: Protos.Value.Type.SCALAR, + scalar: new Protos.Value.Scalar({value: RenderCpus}) + }), + new Protos.Resource({ + name: "mem", + type: Protos.Value.Type.SCALAR, + scalar: new Protos.Value.Scalar({value: RenderMem}) + }) + ], + executor: new Protos.ExecutorInfo({ + executor_id: new Protos.ExecutorID({value: "RenderExecutor"}), + command: new Protos.CommandInfo({ + value: "node index.js -executor=render", + user: runAsUser, + uris: [ + new Protos.CommandInfo.URI({ + cache: false, + extract: true, + value: "./rendler.tar.gz", // relative to "frameworks_home" mesos-slave command argument + executable: false + }) + ] + }), + data: ByteBuffer.fromUTF8(outputDir) + }), + data: ByteBuffer.fromUTF8(url) + }); + } + + function getCrawlTaskInfo(offer, uniqueId, url) { + return new Protos.TaskInfo({ + name: "Rendler.Crawl_" + uniqueId, + task_id: new Protos.TaskID({value: uniqueId.toString()}), + slave_id: offer.slave_id, + resources: [ + new Protos.Resource({ + name: "cpus", + type: Protos.Value.Type.SCALAR, + scalar: new Protos.Value.Scalar({value: CrawlCpus}) + }), + new Protos.Resource({ + name: "mem", + type: Protos.Value.Type.SCALAR, + scalar: new Protos.Value.Scalar({value: CrawlMem}) + }) + ], + executor: new Protos.ExecutorInfo({ + executor_id: new Protos.ExecutorID({value: "CrawlExecutor"}), + command: new Protos.CommandInfo({ + value: "node index.js -executor=crawl", + user: runAsUser, + uris: [ + new Protos.CommandInfo.URI({ + cache: false, + extract: true, + value: "./rendler.tar.gz", // relative to "frameworks_home" mesos-slave command argument + executable: false + }) + ] + }) + }), + data: ByteBuffer.fromUTF8(url) + }); + } + + function ResourcesCounter(offer) { + var _cpus = 0; + var _mem = 0; + + var cpusResource = getResource("cpus"); + if (cpusResource) + _cpus = cpusResource.scalar.value; + + var memResource = getResource("mem"); + if (memResource) + _mem = memResource.scalar.value; + + function getResource(name) { + return offer.resources.find(function (r) { + return r.name === name; + }); + } + + function subtract(cpus, mem) { + _cpus = _cpus - cpus; + _mem = _mem - mem; + } + + function hasResources(cpus, mem) { + return _cpus >= cpus && _mem >= mem; + } + + var result = {}; + + result.hasRenderTaskResources = function () { + return hasResources(RenderCpus, RenderMem); + }; + + result.hasCrawlTaskResources = function () { + return hasResources(CrawlCpus, CrawlMem); + }; + + result.subtractRenderResources = function () { + subtract(RenderCpus, RenderMem); + }; + + result.subtractCrawlResources = function () { + subtract(CrawlCpus, CrawlMem); + }; + + return result; + } + + function isTerminalTaskState(taskState) { + return taskState === Protos.TaskState.TASK_FINISHED || + taskState === Protos.TaskState.TASK_FAILED || + taskState === Protos.TaskState.TASK_KILLED || + taskState === Protos.TaskState.TASK_LOST || + taskState === Protos.TaskState.TASK_ERROR; + } + + this.on("registered", onRegistered); + this.on("resourceOffers", onResourceOffers); + this.on("statusUpdate", onStatusUpdate); + this.on("frameworkMessage", onFrameworkMessage); + this.on("error", onError); +} + +util.inherits(RendlerScheduler, EventEmitter); + +module.exports = RendlerScheduler; \ No newline at end of file diff --git a/nodejs/support/cluster.sh b/nodejs/support/cluster.sh new file mode 100755 index 0000000..ae70b72 --- /dev/null +++ b/nodejs/support/cluster.sh @@ -0,0 +1,110 @@ +#!/bin/bash + +SCRIPTDIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd) + +CLUSTER_WORK_DIR=${SCRIPTDIR}/work +RENDLER_OUTPUT_DIR=/tmp/rendlerout +APPDIR=${SCRIPTDIR}/../ + +copy_files() { + if [ ! -d "$CLUSTER_WORK_DIR" ]; then + echo "Creating cluster work dir: $CLUSTER_WORK_DIR ..." + mkdir "$CLUSTER_WORK_DIR" + fi + + if [ ! -d "$RENDLER_OUTPUT_DIR" ]; then + echo "Creating rendler output dir: $RENDLER_OUTPUT_DIR ..." + mkdir "$RENDLER_OUTPUT_DIR" + chmod 777 $RENDLER_OUTPUT_DIR + fi + + + cd $APPDIR + tar -czf ${CLUSTER_WORK_DIR}/rendler.tar.gz * --exclude='support*' + cd - +} + +start_cluster() { + copy_files + + export LD_LIBRARY_PATH=/usr/local/lib:${LD_LIBRARY_PATH} + + mesos-master --cluster=mesosclr --ip=127.0.0.50 --port=5050 --allocation_interval=1secs --registry=in_memory --quorum=1 --quiet \ + --log_dir=${CLUSTER_WORK_DIR}/master/logs --work_dir=${CLUSTER_WORK_DIR}/master/data & + + sleep 0.5s + + mesos-slave --master=127.0.0.50:5050 --ip=127.0.0.51 --port=5051 --resources="cpus:2;mem:512" --attributes=name:slave1 --quiet \ + --frameworks_home=${CLUSTER_WORK_DIR} --log_dir=${CLUSTER_WORK_DIR}/slave1/logs --work_dir=${CLUSTER_WORK_DIR}/slave1/data & + + sleep 0.2s + + mesos-slave --master=127.0.0.50:5050 --ip=127.0.0.52 --port=5052 --resources="cpus:2;mem:512" --attributes=name:slave2 --quiet \ + --frameworks_home=${CLUSTER_WORK_DIR} --log_dir=${CLUSTER_WORK_DIR}/slave2/logs --work_dir=${CLUSTER_WORK_DIR}/slave2/data & + + sleep 0.2s + + mesos-slave --master=127.0.0.50:5050 --ip=127.0.0.53 --port=5053 --resources="cpus:2;mem:512" --attributes=name:slave3 --quiet \ + --frameworks_home=${CLUSTER_WORK_DIR} --log_dir=${CLUSTER_WORK_DIR}/slave3/logs --work_dir=${CLUSTER_WORK_DIR}/slave3/data & +} + +stop_cluster() { + killall -q mesos-slave + killall -q mesos-master +} + +clean() { + echo "Removing cluster work dir at: $CLUSTER_WORK_DIR ..." + rm -rf "$CLUSTER_WORK_DIR" + + echo "Removing temp Mesos dir at: /tmp/mesos ..." + rm -rf /tmp/mesos + + echo "Removing Rendler output dir at: $RENDLER_OUTPUT_DIR ..." + rm -rf $RENDLER_OUTPUT_DIR +} + +if [ "$(id -u)" != "0" ]; then + echo "Mesos requires to be executed as root." + exit 1 +fi + +if [ -z "$SCRIPTDIR" ]; then + echo "Could not detect current script dir..." + exit 1 +fi + +case "$1" in + start) + echo "Starting..." + start_cluster + echo "Done." + ;; + stop) + echo "Stopping..." + stop_cluster + echo "Done." + ;; + clean) + echo "Cleaning cluster dir..." + clean + echo "Done." + ;; + restart) + echo "Stopping..." + stop_cluster + + if [ "$2" = "-c" ]; then + echo "Cleaning cluster dir..." + clean + fi + + echo "Starting..." + start_cluster + echo "Done." + ;; + *) + echo "Usage: cluster {start|stop|restart|clean}" + exit 1 + ;; +esac