From bc94bde26f1c1f2fb39efedfe7692285ba87c614 Mon Sep 17 00:00:00 2001 From: Abhay Agarwal Date: Tue, 10 Feb 2015 18:12:24 -0800 Subject: [PATCH 1/5] updated dependencies (TODO: Godeps) --- go/README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/go/README.md b/go/README.md index 9bc2998..23cc866 100644 --- a/go/README.md +++ b/go/README.md @@ -6,10 +6,14 @@ Once you are ssh'd into the `mesos-demo` VM: ```bash cd hostfiles/go export GOPATH=$PWD -go get code.google.com/p/goprotobuf/{proto,protoc-gen-go} -go get github.com/mesosphere/mesos-go/mesos +go get golang.org/x/net/context +go get code.google.com/p/go-uuid/uuid +go get github.com/golang/glog +go get github.com/gogo/protobuf/proto +go get github.com/stretchr/testify/mock +go get github.com/mesos/mesos-go/mesos go install github.com/mesosphere/rendler/scheduler -./bin/scheduler -seed http://mesosphere.io -master 127.0.1.1:5050 -local +./bin/scheduler -seed http://mesosphere.com -master 127.0.1.1:5050 -local ``` ### Generate graph From ed3592423d5af693845bd8606497d6825dbdc798 Mon Sep 17 00:00:00 2001 From: Abhay Agarwal Date: Tue, 10 Feb 2015 18:13:29 -0800 Subject: [PATCH 2/5] updated skeleton with new mesos-go bindings --- .../rendler/scheduler_skeleton/main.go | 356 +++++++++--------- 1 file changed, 177 insertions(+), 179 deletions(-) diff --git a/go/src/github.com/mesosphere/rendler/scheduler_skeleton/main.go b/go/src/github.com/mesosphere/rendler/scheduler_skeleton/main.go index 5f3ec64..112f670 100644 --- a/go/src/github.com/mesosphere/rendler/scheduler_skeleton/main.go +++ b/go/src/github.com/mesosphere/rendler/scheduler_skeleton/main.go @@ -1,76 +1,174 @@ package main import ( - "code.google.com/p/goprotobuf/proto" "container/list" "encoding/json" "flag" "fmt" - "github.com/mesosphere/mesos-go/mesos" "github.com/mesosphere/rendler" "log" "os" "os/signal" + "time" "path/filepath" ) +import ( + "github.com/gogo/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + util "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" +) + const TASK_CPUS = 0.1 const TASK_MEM = 32.0 +const SHUTDOWN_TIMEOUT = 30 // in seconds + +// ----------------- util --------------- + +func maxTasksForOffer(offer *mesos.Offer) int { + // TODO(nnielsen): Parse offer resources. + count := 0 + + var cpus float64 = 0 + var mem float64 = 0 + + for _, resource := range offer.Resources { + if resource.GetName() == "cpus" { + cpus = *resource.GetScalar().Value + } + + if resource.GetName() == "mem" { + mem = *resource.GetScalar().Value + } + } + + for cpus >= TASK_CPUS && mem >= TASK_MEM { + count++ + cpus -= TASK_CPUS + mem -= TASK_MEM + } + + return count +} + +func printQueueStatistics() { + // TODO(nnielsen): Print queue lengths. +} + +func makeTaskPrototype(offer *mesos.Offer, sched *RendlerScheduler) *mesos.TaskInfo { + taskId := sched.tasksLaunched + sched.tasksLaunched++ + return &mesos.TaskInfo{ + TaskId: &mesos.TaskID{ + Value: proto.String(fmt.Sprintf("RENDLER-%d", taskId)), + }, + SlaveId: offer.SlaveId, + Resources: []*mesos.Resource{ + util.NewScalarResource("cpus", TASK_CPUS), + util.NewScalarResource("mem", TASK_MEM), + }, + } + } + +func makeCrawlTask(url string, offer *mesos.Offer, sched *RendlerScheduler) *mesos.TaskInfo { + task := makeTaskPrototype(offer, sched) + task.Name = proto.String("CRAWL_" + *task.TaskId.Value) + task.Executor = sched.crawlExecutor + task.Data = []byte(url) + return task +} + +func makeRenderTask(url string, offer *mesos.Offer, sched *RendlerScheduler) *mesos.TaskInfo { + task := makeTaskPrototype(offer, sched) + task.Name = proto.String("RENDER_" + *task.TaskId.Value) + task.Executor = sched.renderExecutor + task.Data = []byte(url) + return task +} + +//--------------- scheduler ---------------- + +type RendlerScheduler struct { + crawlExecutor *mesos.ExecutorInfo + renderExecutor *mesos.ExecutorInfo + tasksLaunched int + tasksFinished int + shuttingDown bool + crawlQueue *list.List + renderQueue *list.List + processedURLs *list.List + crawlResults *list.List + renderResults map[string]string + seedUrl string +} + +func newRendlerScheduler(crawlExecutor *mesos.ExecutorInfo, renderExecutor *mesos.ExecutorInfo, seedUrl string) *RendlerScheduler { -// See the Mesos Framework Development Guide: -// http://mesos.apache.org/documentation/latest/app-framework-development-guide -// -// Scheduler, scheduler driver, executor, and executor driver definitions: -// https://github.com/apache/mesos/blob/master/src/python/src/mesos.py -// https://github.com/apache/mesos/blob/master/include/mesos/scheduler.hpp -// -// Mesos protocol buffer definitions for Python: -// https://github.com/mesosphere/deimos/blob/master/deimos/mesos_pb2.py -// https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto -// -// NOTE: Feel free to strip out "_ = variable" stubs. They are in place to -// silence the Go compiler. -func main() { crawlQueue := list.New() // list of string renderQueue := list.New() // list of string - _ = renderQueue processedURLs := list.New() // list of string - _ = processedURLs - - crawlResults := list.New() // list of CrawlEdge + crawlResults := list.New() // list of CrawlEdge renderResults := make(map[string]string) - seedUrl := flag.String("seed", "http://mesosphere.io", "The first URL to crawl") - master := flag.String("master", "127.0.1.1:5050", "Location of leading Mesos master") - localMode := flag.Bool("local", true, "If true, saves rendered web pages on local disk") - // TODO(nnielsen): Add flag for artifacts. + crawlQueue.PushBack(seedUrl) + + return &RendlerScheduler{ + crawlExecutor: crawlExecutor, + renderExecutor: renderExecutor, + tasksLaunched: 0, + tasksFinished: 0, + shuttingDown: false, + crawlQueue: crawlQueue, + renderQueue: renderQueue, + processedURLs: processedURLs, + crawlResults: crawlResults, + renderResults: renderResults, + seedUrl: seedUrl, + } +} - flag.Parse() +func (sched *RendlerScheduler) Registered(driver sched.SchedulerDriver, frameworkId *mesos.FrameworkID, masterInfo *mesos.MasterInfo) { + log.Printf("Registered") +} - crawlQueue.PushBack(*seedUrl) +func (sched *RendlerScheduler) Reregistered(driver sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Printf("Framework Re-Registered with Master ", masterInfo) +} - tasksCreated := 0 - tasksRunning := 0 +func (sched *RendlerScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + printQueueStatistics() +} - // TODO(nnielsen): based on `tasksRunning`, do - // graceful shutdown of framework (allow ongoing render tasks to - // finish). - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) - go func(c chan os.Signal) { - s := <-c - fmt.Println("Got signal:", s) +func (sched *RendlerScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", rendler.NameFor(status.State), *status.TaskId.Value) +} - if s == os.Interrupt { - rendler.WriteDOTFile(crawlResults, renderResults) - } - os.Exit(1) - }(c) +func (sched *RendlerScheduler) FrameworkMessage(driver sched.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { + +} + +func (sched *RendlerScheduler) Error(driver sched.SchedulerDriver, err string) { + log.Printf("Scheduler received error:", err) +} +func (sched *RendlerScheduler) Disconnected(sched.SchedulerDriver) {} +func (sched *RendlerScheduler) OfferRescinded(sched.SchedulerDriver, *mesos.OfferID) {} +func (sched *RendlerScheduler) SlaveLost(sched.SchedulerDriver, *mesos.SlaveID) {} +func (sched *RendlerScheduler) ExecutorLost(sched.SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, int) {} + +func main() { crawlCommand := "python crawl_executor.py" renderCommand := "python render_executor.py" + seedUrl := flag.String("seed", "http://mesosphere.io", "The first URL to crawl") + master := flag.String("master", "127.0.1.1:5050", "Location of leading Mesos master") + localMode := flag.Bool("local", true, "If true, saves rendered web pages on local disk") + // TODO(nnielsen): Add flag for artifacts. + + flag.Parse() + if *localMode { renderCommand += " --local" } @@ -96,151 +194,51 @@ func main() { Name: proto.String("Renderer"), } - makeTaskPrototype := func(offer mesos.Offer) *mesos.TaskInfo { - taskId := tasksCreated - tasksCreated++ - return &mesos.TaskInfo{ - TaskId: &mesos.TaskID{ - Value: proto.String(fmt.Sprintf("RENDLER-%d", taskId)), - }, - SlaveId: offer.SlaveId, - Resources: []*mesos.Resource{ - mesos.ScalarResource("cpus", TASK_CPUS), - mesos.ScalarResource("mem", TASK_MEM), - }, - } - } + scheduler := newRendlerScheduler(crawlExecutor, renderExecutor, *seedUrl) - makeCrawlTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { - task := makeTaskPrototype(offer) - task.Name = proto.String("CRAWL_" + *task.TaskId.Value) - // - // TODO - // - return task - } - _ = makeCrawlTask - - makeRenderTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { - task := makeTaskPrototype(offer) - task.Name = proto.String("RENDER_" + *task.TaskId.Value) - // - // TODO - // - return task + master = master + + fwInfo := &mesos.FrameworkInfo{ + Name: proto.String("RENDLER"), + User: proto.String(""), } - _ = makeRenderTask - maxTasksForOffer := func(offer mesos.Offer) int { - // TODO(nnielsen): Parse offer resources. - count := 0 + driver, err := sched.NewMesosSchedulerDriver( + scheduler, + fwInfo, + *master, + (*mesos.Credential)(nil), + ) - var cpus float64 = 0 - _ = cpus + if err != nil { + log.Printf("Unable to create a SchedulerDriver ", err.Error()) + } - var mem float64 = 0 - _ = mem + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + go func(c chan os.Signal) { + s := <-c + fmt.Println("Got signal:", s) - for _, resource := range offer.Resources { - if resource.GetName() == "cpus" { - cpus = *resource.GetScalar().Value + if s == os.Interrupt { + fmt.Println("RENDLER is shutting down") + scheduler.shuttingDown = true + wait_started := time.Now() + for scheduler.tasksLaunched > 0 && SHUTDOWN_TIMEOUT > int(time.Since(wait_started).Seconds()) { + time.Sleep(time.Second) } - if resource.GetName() == "mem" { - mem = *resource.GetScalar().Value + if scheduler.tasksLaunched > 0 { + fmt.Println("Shutdown by timeout,", scheduler.tasksLaunched, "task(s) have not completed") } - } - - // - // TODO - // - - return count - } - _ = maxTasksForOffer - - printQueueStatistics := func() { - // TODO(nnielsen): Print queue lengths. - } - driver := mesos.SchedulerDriver{ - Master: *master, - Framework: mesos.FrameworkInfo{ - Name: proto.String("RENDLER"), - User: proto.String(""), - }, - - Scheduler: &mesos.Scheduler{ - - Registered: func( - driver *mesos.SchedulerDriver, - frameworkId mesos.FrameworkID, - masterInfo mesos.MasterInfo) { - log.Printf("Registered") - }, - - ResourceOffers: func(driver *mesos.SchedulerDriver, offers []mesos.Offer) { - printQueueStatistics() - - // - // TODO - // - }, - - StatusUpdate: func(driver *mesos.SchedulerDriver, status mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", rendler.NameFor(status.State), *status.TaskId.Value) - - if *status.State == mesos.TaskState_TASK_RUNNING { - tasksRunning++ - } else if rendler.IsTerminal(status.State) { - tasksRunning-- - } - }, - - FrameworkMessage: func( - driver *mesos.SchedulerDriver, - executorId mesos.ExecutorID, - slaveId mesos.SlaveID, - message string) { - - switch *executorId.Value { - case *crawlExecutor.ExecutorId.Value: - log.Print("Received framework message from crawler") - var result rendler.CrawlResult - err := json.Unmarshal([]byte(message), &result) - if err != nil { - log.Printf("Error deserializing CrawlResult: [%s]", err) - } else { - // - // TODO - // - } - - case *renderExecutor.ExecutorId.Value: - log.Printf("Received framework message from renderer") - var result rendler.RenderResult - err := json.Unmarshal([]byte(message), &result) - if err != nil { - log.Printf("Error deserializing RenderResult: [%s]", err) - } else { - // - // TODO - // - } - - default: - log.Printf("Received a framework message from some unknown source: %s", *executorId.Value) - } - }, - }, - } - - driver.Init() - defer driver.Destroy() + driver.Stop(false) + } + }(c) - driver.Start() - driver.Join() - driver.Stop(false) + driver.Run() + rendler.WriteDOTFile(scheduler.crawlResults, scheduler.renderResults) + os.Exit(0) } func executorURIs() []*mesos.CommandInfo_URI { @@ -258,10 +256,10 @@ func executorURIs() []*mesos.CommandInfo_URI { } return []*mesos.CommandInfo_URI{ - pathToURI(baseURI+"crawl_executor.py", false), pathToURI(baseURI+"render.js", false), - pathToURI(baseURI+"render_executor.py", false), - pathToURI(baseURI+"results.py", false), - pathToURI(baseURI+"task_state.py", false), + pathToURI(baseURI+"python/crawl_executor.py", false), + pathToURI(baseURI+"python/render_executor.py", false), + pathToURI(baseURI+"python/results.py", false), + pathToURI(baseURI+"python/task_state.py", false), } } From 36aaf91b204054f1b738c86024637e070ba91959 Mon Sep 17 00:00:00 2001 From: Abhay Agarwal Date: Tue, 10 Feb 2015 18:13:37 -0800 Subject: [PATCH 3/5] updating scheduler with new bindings --- .../mesosphere/rendler/scheduler/main.go | 432 ++++++++++-------- 1 file changed, 239 insertions(+), 193 deletions(-) diff --git a/go/src/github.com/mesosphere/rendler/scheduler/main.go b/go/src/github.com/mesosphere/rendler/scheduler/main.go index 0127721..697b5d5 100644 --- a/go/src/github.com/mesosphere/rendler/scheduler/main.go +++ b/go/src/github.com/mesosphere/rendler/scheduler/main.go @@ -1,12 +1,10 @@ package main import ( - "code.google.com/p/goprotobuf/proto" "container/list" "encoding/json" "flag" "fmt" - "github.com/mesosphere/mesos-go/mesos" "github.com/mesosphere/rendler" "log" "os" @@ -15,235 +13,288 @@ import ( "path/filepath" ) +import ( + "github.com/gogo/protobuf/proto" + mesos "github.com/mesos/mesos-go/mesosproto" + util "github.com/mesos/mesos-go/mesosutil" + sched "github.com/mesos/mesos-go/scheduler" +) + const TASK_CPUS = 0.1 const TASK_MEM = 32.0 const SHUTDOWN_TIMEOUT = 30 // in seconds -func main() { - crawlQueue := list.New() // list of string - renderQueue := list.New() // list of string - - processedURLs := list.New() // list of string - crawlResults := list.New() // list of CrawlEdge - renderResults := make(map[string]string) - - seedUrl := flag.String("seed", "http://mesosphere.io", "The first URL to crawl") - master := flag.String("master", "127.0.1.1:5050", "Location of leading Mesos master") - localMode := flag.Bool("local", true, "If true, saves rendered web pages on local disk") - // TODO(nnielsen): Add flag for artifacts. - - flag.Parse() +// ----------------- util --------------- - crawlQueue.PushBack(*seedUrl) +func maxTasksForOffer(offer *mesos.Offer) int { + // TODO(nnielsen): Parse offer resources. + count := 0 - tasksCreated := 0 - tasksRunning := 0 - shuttingDown := false + var cpus float64 = 0 + var mem float64 = 0 - crawlCommand := "python crawl_executor.py" - renderCommand := "python render_executor.py" + for _, resource := range offer.Resources { + if resource.GetName() == "cpus" { + cpus = *resource.GetScalar().Value + } - if *localMode { - renderCommand += " --local" + if resource.GetName() == "mem" { + mem = *resource.GetScalar().Value + } } - // TODO(nnielsen): In local mode, verify artifact locations. - rendlerArtifacts := executorURIs() - - crawlExecutor := &mesos.ExecutorInfo{ - ExecutorId: &mesos.ExecutorID{Value: proto.String("crawl-executor")}, - Command: &mesos.CommandInfo{ - Value: proto.String(crawlCommand), - Uris: rendlerArtifacts, - }, - Name: proto.String("Crawler"), + for cpus >= TASK_CPUS && mem >= TASK_MEM { + count++ + cpus -= TASK_CPUS + mem -= TASK_MEM } - renderExecutor := &mesos.ExecutorInfo{ - ExecutorId: &mesos.ExecutorID{Value: proto.String("render-executor")}, - Command: &mesos.CommandInfo{ - Value: proto.String(renderCommand), - Uris: rendlerArtifacts, - }, - Name: proto.String("Renderer"), - } + return count +} + +func printQueueStatistics() { + // TODO(nnielsen): Print queue lengths. +} - makeTaskPrototype := func(offer mesos.Offer) *mesos.TaskInfo { - taskId := tasksCreated - tasksCreated++ +func makeTaskPrototype(offer *mesos.Offer, sched *RendlerScheduler) *mesos.TaskInfo { + taskId := sched.tasksLaunched + sched.tasksLaunched++ return &mesos.TaskInfo{ TaskId: &mesos.TaskID{ Value: proto.String(fmt.Sprintf("RENDLER-%d", taskId)), }, SlaveId: offer.SlaveId, Resources: []*mesos.Resource{ - mesos.ScalarResource("cpus", TASK_CPUS), - mesos.ScalarResource("mem", TASK_MEM), + util.NewScalarResource("cpus", TASK_CPUS), + util.NewScalarResource("mem", TASK_MEM), }, } } - makeCrawlTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { - task := makeTaskPrototype(offer) - task.Name = proto.String("CRAWL_" + *task.TaskId.Value) - task.Executor = crawlExecutor - task.Data = []byte(url) - return task - } +func makeCrawlTask(url string, offer *mesos.Offer, sched *RendlerScheduler) *mesos.TaskInfo { + task := makeTaskPrototype(offer, sched) + task.Name = proto.String("CRAWL_" + *task.TaskId.Value) + task.Executor = sched.crawlExecutor + task.Data = []byte(url) + return task +} + +func makeRenderTask(url string, offer *mesos.Offer, sched *RendlerScheduler) *mesos.TaskInfo { + task := makeTaskPrototype(offer, sched) + task.Name = proto.String("RENDER_" + *task.TaskId.Value) + task.Executor = sched.renderExecutor + task.Data = []byte(url) + return task +} + +//--------------- scheduler ---------------- + +type RendlerScheduler struct { + crawlExecutor *mesos.ExecutorInfo + renderExecutor *mesos.ExecutorInfo + tasksLaunched int + tasksFinished int + shuttingDown bool + crawlQueue *list.List + renderQueue *list.List + processedURLs *list.List + crawlResults *list.List + renderResults map[string]string + seedUrl string +} + +func newRendlerScheduler(crawlExecutor *mesos.ExecutorInfo, renderExecutor *mesos.ExecutorInfo, seedUrl string) *RendlerScheduler { + + crawlQueue := list.New() // list of string + renderQueue := list.New() // list of string + + processedURLs := list.New() // list of string + crawlResults := list.New() // list of CrawlEdge + renderResults := make(map[string]string) - makeRenderTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { - task := makeTaskPrototype(offer) - task.Name = proto.String("RENDER_" + *task.TaskId.Value) - task.Executor = renderExecutor - task.Data = []byte(url) - return task + crawlQueue.PushBack(seedUrl) + + return &RendlerScheduler{ + crawlExecutor: crawlExecutor, + renderExecutor: renderExecutor, + tasksLaunched: 0, + tasksFinished: 0, + shuttingDown: false, + crawlQueue: crawlQueue, + renderQueue: renderQueue, + processedURLs: processedURLs, + crawlResults: crawlResults, + renderResults: renderResults, + seedUrl: seedUrl, } +} - maxTasksForOffer := func(offer mesos.Offer) int { - // TODO(nnielsen): Parse offer resources. - count := 0 +func (sched *RendlerScheduler) Registered(driver sched.SchedulerDriver, frameworkId *mesos.FrameworkID, masterInfo *mesos.MasterInfo) { + log.Printf("Registered") +} - var cpus float64 = 0 - var mem float64 = 0 +func (sched *RendlerScheduler) Reregistered(driver sched.SchedulerDriver, masterInfo *mesos.MasterInfo) { + log.Printf("Framework Re-Registered with Master ", masterInfo) +} - for _, resource := range offer.Resources { - if resource.GetName() == "cpus" { - cpus = *resource.GetScalar().Value - } +func (sched *RendlerScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) { + printQueueStatistics() - if resource.GetName() == "mem" { - mem = *resource.GetScalar().Value - } + for _, offer := range offers { + if sched.shuttingDown { + fmt.Println("Shutting down: declining offer on [", offer.Hostname, "]") + driver.DeclineOffer(offer.Id, &mesos.Filters{RefuseSeconds: proto.Float64(1)}) + continue } - for cpus >= TASK_CPUS && mem >= TASK_MEM { - count++ - cpus -= TASK_CPUS - mem -= TASK_MEM + var tasks []*mesos.TaskInfo + + for i := 0; i < maxTasksForOffer(offer)/2; i++ { + if sched.crawlQueue.Front() != nil { + url := sched.crawlQueue.Front().Value.(string) + sched.crawlQueue.Remove(sched.crawlQueue.Front()) + task := makeCrawlTask(url, offer, sched) + tasks = append(tasks, task) + } + if sched.renderQueue.Front() != nil { + url := sched.renderQueue.Front().Value.(string) + sched.renderQueue.Remove(sched.renderQueue.Front()) + task := makeRenderTask(url, offer, sched) + tasks = append(tasks, task) + } } - return count + if len(tasks) == 0 { + driver.DeclineOffer(offer.Id, &mesos.Filters{RefuseSeconds: proto.Float64(1)}) + } else { + driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, &mesos.Filters{RefuseSeconds: proto.Float64(1)}) + } } +} + +func (sched *RendlerScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) { + log.Printf("Received task status [%s] for task [%s]", rendler.NameFor(status.State), *status.TaskId.Value) - printQueueStatistics := func() { - // TODO(nnielsen): Print queue lengths. + if *status.State == mesos.TaskState_TASK_RUNNING { + sched.tasksLaunched++ + } else if rendler.IsTerminal(status.State) { + sched.tasksLaunched-- } +} - driver := mesos.SchedulerDriver{ - Master: *master, - Framework: mesos.FrameworkInfo{ - Name: proto.String("RENDLER"), - User: proto.String(""), - }, +func (sched *RendlerScheduler) FrameworkMessage(driver sched.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { + + switch *executorId.Value { + case *sched.crawlExecutor.ExecutorId.Value: + log.Print("Received framework message from crawler") + var result rendler.CrawlResult + err := json.Unmarshal([]byte(message), &result) + if err != nil { + log.Printf("Error deserializing CrawlResult: [%s]", err) + } else { + for _, link := range result.Links { + edge := rendler.Edge{From: result.URL, To: link} + log.Printf("Appending [%s] to crawl results", edge) + sched.crawlResults.PushBack(edge) + + alreadyProcessed := false + for e := sched.processedURLs.Front(); e != nil && !alreadyProcessed; e = e.Next() { + processedURL := e.Value.(string) + if link == processedURL { + alreadyProcessed = true + } + } - Scheduler: &mesos.Scheduler{ + if !alreadyProcessed { + log.Printf("Enqueueing [%s]", link) + sched.crawlQueue.PushBack(link) + sched.renderQueue.PushBack(link) + sched.processedURLs.PushBack(link) + } + } + } - Registered: func( - driver *mesos.SchedulerDriver, - frameworkId mesos.FrameworkID, - masterInfo mesos.MasterInfo) { - log.Printf("Registered") - }, + case *sched.renderExecutor.ExecutorId.Value: + log.Printf("Received framework message from renderer") + var result rendler.RenderResult + err := json.Unmarshal([]byte(message), &result) + if err != nil { + log.Printf("Error deserializing RenderResult: [%s]", err) + } else { + log.Printf( + "Appending [%s] to render results", + rendler.Edge{From: result.URL, To: result.ImageURL}) + sched.renderResults[result.URL] = result.ImageURL + } - ResourceOffers: func(driver *mesos.SchedulerDriver, offers []mesos.Offer) { - printQueueStatistics() + default: + log.Printf("Received a framework message from some unknown source: %s", *executorId.Value) + } +} - for _, offer := range offers { - if shuttingDown { - fmt.Println("Shutting down: declining offer on [", offer.Hostname, "]") - driver.DeclineOffer(offer.Id) - continue - } +func (sched *RendlerScheduler) Error(driver sched.SchedulerDriver, err string) { + log.Printf("Scheduler received error:", err) +} +func (sched *RendlerScheduler) Disconnected(sched.SchedulerDriver) {} +func (sched *RendlerScheduler) OfferRescinded(sched.SchedulerDriver, *mesos.OfferID) {} +func (sched *RendlerScheduler) SlaveLost(sched.SchedulerDriver, *mesos.SlaveID) {} +func (sched *RendlerScheduler) ExecutorLost(sched.SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, int) {} - tasks := []mesos.TaskInfo{} - - for i := 0; i < maxTasksForOffer(offer)/2; i++ { - if crawlQueue.Front() != nil { - url := crawlQueue.Front().Value.(string) - crawlQueue.Remove(crawlQueue.Front()) - task := makeCrawlTask(url, offer) - tasks = append(tasks, *task) - } - if renderQueue.Front() != nil { - url := renderQueue.Front().Value.(string) - renderQueue.Remove(renderQueue.Front()) - task := makeRenderTask(url, offer) - tasks = append(tasks, *task) - } - } +func main() { - if len(tasks) == 0 { - driver.DeclineOffer(offer.Id) - } else { - driver.LaunchTasks(offer.Id, tasks) - } - } - }, + crawlCommand := "python crawl_executor.py" + renderCommand := "python render_executor.py" - StatusUpdate: func(driver *mesos.SchedulerDriver, status mesos.TaskStatus) { - log.Printf("Received task status [%s] for task [%s]", rendler.NameFor(status.State), *status.TaskId.Value) + seedUrl := flag.String("seed", "http://mesosphere.io", "The first URL to crawl") + master := flag.String("master", "127.0.1.1:5050", "Location of leading Mesos master") + localMode := flag.Bool("local", true, "If true, saves rendered web pages on local disk") + // TODO(nnielsen): Add flag for artifacts. - if *status.State == mesos.TaskState_TASK_RUNNING { - tasksRunning++ - } else if rendler.IsTerminal(status.State) { - tasksRunning-- - } - }, + flag.Parse() - FrameworkMessage: func( - driver *mesos.SchedulerDriver, - executorId mesos.ExecutorID, - slaveId mesos.SlaveID, - message string) { - - switch *executorId.Value { - case *crawlExecutor.ExecutorId.Value: - log.Print("Received framework message from crawler") - var result rendler.CrawlResult - err := json.Unmarshal([]byte(message), &result) - if err != nil { - log.Printf("Error deserializing CrawlResult: [%s]", err) - } else { - for _, link := range result.Links { - edge := rendler.Edge{From: result.URL, To: link} - log.Printf("Appending [%s] to crawl results", edge) - crawlResults.PushBack(edge) - - alreadyProcessed := false - for e := processedURLs.Front(); e != nil && !alreadyProcessed; e = e.Next() { - processedURL := e.Value.(string) - if link == processedURL { - alreadyProcessed = true - } - } - - if !alreadyProcessed { - log.Printf("Enqueueing [%s]", link) - crawlQueue.PushBack(link) - renderQueue.PushBack(link) - processedURLs.PushBack(link) - } - } - } + if *localMode { + renderCommand += " --local" + } - case *renderExecutor.ExecutorId.Value: - log.Printf("Received framework message from renderer") - var result rendler.RenderResult - err := json.Unmarshal([]byte(message), &result) - if err != nil { - log.Printf("Error deserializing RenderResult: [%s]", err) - } else { - log.Printf( - "Appending [%s] to render results", - rendler.Edge{From: result.URL, To: result.ImageURL}) - renderResults[result.URL] = result.ImageURL - } + // TODO(nnielsen): In local mode, verify artifact locations. + rendlerArtifacts := executorURIs() - default: - log.Printf("Received a framework message from some unknown source: %s", *executorId.Value) - } - }, + crawlExecutor := &mesos.ExecutorInfo{ + ExecutorId: &mesos.ExecutorID{Value: proto.String("crawl-executor")}, + Command: &mesos.CommandInfo{ + Value: proto.String(crawlCommand), + Uris: rendlerArtifacts, + }, + Name: proto.String("Crawler"), + } + + renderExecutor := &mesos.ExecutorInfo{ + ExecutorId: &mesos.ExecutorID{Value: proto.String("render-executor")}, + Command: &mesos.CommandInfo{ + Value: proto.String(renderCommand), + Uris: rendlerArtifacts, }, + Name: proto.String("Renderer"), + } + + scheduler := newRendlerScheduler(crawlExecutor, renderExecutor, *seedUrl) + + master = master + + fwInfo := &mesos.FrameworkInfo{ + Name: proto.String("RENDLER"), + User: proto.String(""), + } + + driver, err := sched.NewMesosSchedulerDriver( + scheduler, + fwInfo, + *master, + (*mesos.Credential)(nil), + ) + + if err != nil { + log.Printf("Unable to create a SchedulerDriver ", err.Error()) } c := make(chan os.Signal, 1) @@ -254,27 +305,22 @@ func main() { if s == os.Interrupt { fmt.Println("RENDLER is shutting down") - shuttingDown = true + scheduler.shuttingDown = true wait_started := time.Now() - for tasksRunning > 0 && SHUTDOWN_TIMEOUT > int(time.Since(wait_started).Seconds()) { + for scheduler.tasksLaunched > 0 && SHUTDOWN_TIMEOUT > int(time.Since(wait_started).Seconds()) { time.Sleep(time.Second) } - if tasksRunning > 0 { - fmt.Println("Shutdown by timeout,", tasksRunning, "task(s) have not completed") + if scheduler.tasksLaunched > 0 { + fmt.Println("Shutdown by timeout,", scheduler.tasksLaunched, "task(s) have not completed") } driver.Stop(false) } }(c) - driver.Init() - defer driver.Destroy() - - driver.Start() - driver.Join() - driver.Stop(false) - rendler.WriteDOTFile(crawlResults, renderResults) + driver.Run() + rendler.WriteDOTFile(scheduler.crawlResults, scheduler.renderResults) os.Exit(0) } From 68a64780a132eabf02cf1090c4ded2900bc6c292 Mon Sep 17 00:00:00 2001 From: Abhay Agarwal Date: Tue, 10 Feb 2015 18:13:47 -0800 Subject: [PATCH 4/5] using new mesos-go bindings --- go/src/github.com/mesosphere/rendler/task_state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/src/github.com/mesosphere/rendler/task_state.go b/go/src/github.com/mesosphere/rendler/task_state.go index 13376a6..f8689dd 100644 --- a/go/src/github.com/mesosphere/rendler/task_state.go +++ b/go/src/github.com/mesosphere/rendler/task_state.go @@ -1,7 +1,7 @@ package rendler import ( - "github.com/mesosphere/mesos-go/mesos" + mesos "github.com/mesos/mesos-go/mesosproto" ) func NameFor(state *mesos.TaskState) string { From ab1a1d76e79fe827463b8486c9cceb3a40324bf7 Mon Sep 17 00:00:00 2001 From: Abhay Agarwal Date: Tue, 10 Feb 2015 18:14:07 -0800 Subject: [PATCH 5/5] uncommenting vm box in Vagrantfile --- Vagrantfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Vagrantfile b/Vagrantfile index 8299ded..51b776a 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -7,7 +7,7 @@ VAGRANTFILE_API_VERSION = "2" Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.box = "mesos-demo" - # config.vm.box_url = "http://downloads.mesosphere.io/demo/mesos-demo.box" + config.vm.box_url = "http://downloads.mesosphere.io/demo/mesos-demo.box" # Create a private network, which allows host-only access to the machine # using a specific IP.