diff --git a/services/marathon/marathon.go b/services/marathon/marathon.go index 459934d..c6e9216 100644 --- a/services/marathon/marathon.go +++ b/services/marathon/marathon.go @@ -2,13 +2,17 @@ package marathon import ( "encoding/json" - "github.com/QubitProducts/bamboo/configuration" "io/ioutil" + "log" "net/http" "sort" "strings" + + "github.com/QubitProducts/bamboo/configuration" ) +const taskStateRunning = "TASK_RUNNING" + // Describes an app process running type Task struct { Id string @@ -16,6 +20,8 @@ type Task struct { Port int Ports []int Alive bool + State string + Ready bool } // A health check on the application @@ -36,6 +42,7 @@ type App struct { HealthCheckPath string HealthCheckProtocol string HealthChecks []HealthCheck + ReadinessCheckPath string Tasks []Task ServicePort int ServicePorts []int @@ -58,12 +65,6 @@ func (slice AppList) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } -type marathonTaskList []marathonTask - -type marathonTasks struct { - Tasks marathonTaskList `json:"tasks"` -} - type HealthCheckResult struct { Alive bool } @@ -74,12 +75,15 @@ type marathonTask struct { Host string Ports []int ServicePorts []int + State string StartedAt string StagedAt string Version string HealthCheckResults []HealthCheckResult } +type marathonTaskList []marathonTask + func (slice marathonTaskList) Len() int { return len(slice) } @@ -97,11 +101,15 @@ type marathonApps struct { } type marathonApp struct { - Id string `json:"id"` - HealthChecks []marathonHealthCheck `json:"healthChecks"` - Ports []int `json:"ports"` - Env map[string]string `json:"env"` - Labels map[string]string `json:"labels"` + Id string `json:"id"` + HealthChecks []marathonHealthCheck `json:"healthChecks"` + Ports []int `json:"ports"` + Env map[string]string `json:"env"` + Labels map[string]string `json:"labels"` + Deployments []deployment `json:"deployments"` + Tasks marathonTaskList `json:"tasks"` + ReadinessChecks []marathonReadinessCheck `json:"readinessChecks"` + ReadinessCheckResults []readinessCheckResult `json:"readinessCheckResults"` } type marathonHealthCheck struct { @@ -110,110 +118,92 @@ type marathonHealthCheck struct { PortIndex int `json:"portIndex"` } -func fetchMarathonApps(endpoint string, conf *configuration.Configuration) (map[string]marathonApp, error) { - client := &http.Client{} - req, _ := http.NewRequest("GET", endpoint+"/v2/apps", nil) - req.Header.Add("Accept", "application/json") - req.Header.Add("Content-Type", "application/json") - if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 { - req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password) - } - response, err := client.Do(req) +type marathonReadinessCheck struct { + Path string `json:"path"` +} - if err != nil { - return nil, err - } +type deployment struct { + ID string `json:"id"` +} - defer response.Body.Close() - var appResponse marathonApps +type readinessCheckResult struct { + TaskID string `json:"taskId"` + Ready bool `json:"ready"` +} - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil, err - } +/* + Apps returns a struct that describes Marathon current app and their + sub tasks information. - err = json.Unmarshal(contents, &appResponse) - if err != nil { - return nil, err - } + Parameters: + endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080 +*/ +func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) { + var marathonApps []marathonApp + var err error - dataById := map[string]marathonApp{} + // Try all configured endpoints until one succeeds or we exhaust the list, + // whichever comes first. + for _, url := range maraconf.Endpoints() { + marathonApps, err = fetchMarathonApps(url, conf) + if err == nil { + for _, marathonApp := range marathonApps { + sort.Sort(marathonApp.Tasks) + } + apps := createApps(marathonApps) + sort.Sort(apps) + return apps, nil + } + } + // return last error + return nil, err +} - for _, appConfig := range appResponse.Apps { - dataById[appConfig.Id] = appConfig +func fetchMarathonApps(endpoint string, conf *configuration.Configuration) ([]marathonApp, error) { + var appResponse marathonApps + if err := parseJSON(endpoint+"/v2/apps?embed=app.tasks&embed=app.deployments&embed=app.readiness", conf, &appResponse); err != nil { + return nil, err } - return dataById, nil + return appResponse.Apps, nil } -func fetchTasks(endpoint string, conf *configuration.Configuration) (map[string]marathonTaskList, error) { +func parseJSON(url string, conf *configuration.Configuration, out interface{}) error { client := &http.Client{} - req, _ := http.NewRequest("GET", endpoint+"/v2/tasks", nil) + req, _ := http.NewRequest("GET", url, nil) req.Header.Add("Accept", "application/json") req.Header.Add("Content-Type", "application/json") if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 { req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password) } - response, err := client.Do(req) - - var tasks marathonTasks + response, err := client.Do(req) if err != nil { - return nil, err + return err } - contents, err := ioutil.ReadAll(response.Body) defer response.Body.Close() - if err != nil { - return nil, err - } - err = json.Unmarshal(contents, &tasks) + contents, err := ioutil.ReadAll(response.Body) if err != nil { - return nil, err - } - - taskList := tasks.Tasks - sort.Sort(taskList) - - tasksById := map[string]marathonTaskList{} - for _, task := range taskList { - if tasksById[task.AppId] == nil { - tasksById[task.AppId] = marathonTaskList{} - } - tasksById[task.AppId] = append(tasksById[task.AppId], task) + return err } - for _, task_list := range tasksById { - sort.Sort(task_list) + err = json.Unmarshal(contents, &out) + if err != nil { + return err } - return tasksById, nil + return nil } -func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool { - //If we don't even have health check results for every health check, don't count the task as healthy - if len(healthChecks) > len(healthCheckResults) { - return false - } - for _, healthCheck := range healthCheckResults { - if !healthCheck.Alive { - return false - } - } - return true -} - -func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]marathonApp) AppList { +func createApps(marathonApps []marathonApp) AppList { apps := AppList{} - for appId, mApp := range marathonApps { - + for _, mApp := range marathonApps { + appId := mApp.Id // Try to handle old app id format without slashes - appPath := appId - if !strings.HasPrefix(appId, "/") { - appPath = "/" + appId - } + appPath := "/" + strings.TrimPrefix(mApp.Id, "/") // build App from marathonApp app := App{ @@ -222,6 +212,7 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m EscapedId: strings.Replace(appId, "/", "::", -1), HealthCheckPath: parseHealthCheckPath(mApp.HealthChecks), HealthCheckProtocol: parseHealthCheckProtocol(mApp.HealthChecks), + ReadinessCheckPath: parseReadinessCheckPath(mApp.ReadinessChecks), Env: mApp.Env, Labels: mApp.Labels, SplitId: strings.Split(appId, "/"), @@ -244,7 +235,7 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m // build Tasks for this App tasks := []Task{} - for _, mTask := range tasksById[appId] { + for _, mTask := range mApp.Tasks { if len(mTask.Ports) > 0 { t := Task{ Id: mTask.Id, @@ -252,6 +243,8 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m Port: mTask.Ports[0], Ports: mTask.Ports, Alive: calculateTaskHealth(mTask.HealthCheckResults, mApp.HealthChecks), + State: mTask.State, + Ready: calculateReadiness(mTask, mApp), } tasks = append(tasks, t) } @@ -297,41 +290,82 @@ func parseHealthCheckProtocol(checks []marathonHealthCheck) string { return "" } -/* - Apps returns a struct that describes Marathon current app and their - sub tasks information. - - Parameters: - endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080 -*/ -func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) { +func parseReadinessCheckPath(checks []marathonReadinessCheck) string { + if len(checks) > 0 { + return checks[0].Path + } - var applist AppList - var err error + return "" +} - // try all configured endpoints until one succeeds - for _, url := range maraconf.Endpoints() { - applist, err = _fetchApps(url, conf) - if err == nil { - return applist, err +func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool { + // If we don't even have health check results for every health check, don't + // count the task as healthy. + if len(healthChecks) > len(healthCheckResults) { + return false + } + for _, healthCheck := range healthCheckResults { + if !healthCheck.Alive { + return false } } - // return last error - return nil, err + return true } -func _fetchApps(url string, conf *configuration.Configuration) (AppList, error) { - tasks, err := fetchTasks(url, conf) - if err != nil { - return nil, err +func calculateReadiness(task marathonTask, maraApp marathonApp) bool { + switch { + case task.State != taskStateRunning: + // By definition, a task not running cannot be ready. + log.Printf("task %s app %s: ready = false [task state %s != required state %s]", task.Id, maraApp.Id, task.State, taskStateRunning) + return false + + case len(maraApp.Deployments) == 0: + // We only care about readiness during deployments; post-deployment readiness + // should be covered by a separate HAProxy health check definition. + log.Printf("task %s app %s: ready = true [no deployment ongoing]", task.Id, maraApp.Id) + return true + + case len(maraApp.ReadinessChecks) == 0: + // Applications without configured readiness checks are always considered + // ready. + log.Printf("task %s app %s: ready = true [no readiness checks on app]", task.Id, maraApp.Id) + return true } - marathonApps, err := fetchMarathonApps(url, conf) - if err != nil { - return nil, err + // Loop through all readiness check results and return the results for + // matching task IDs. + for _, readinessCheckResult := range maraApp.ReadinessCheckResults { + if readinessCheckResult.TaskID == task.Id { + log.Printf("task %s app %s: ready = %t [evaluating readiness check ready state]", task.Id, maraApp.Id, readinessCheckResult.Ready) + return readinessCheckResult.Ready + } } - apps := createApps(tasks, marathonApps) - sort.Sort(apps) - return apps, nil + // There's a corner case sometimes hit where the first new task of a + // deployment goes from TASK_STAGING to TASK_RUNNING without a corresponding + // health check result being included in the API response. This only happens + // in a very short (yet unlucky) time frame and does not repeat for subsequent + // tasks of the same deployment. + // We identify this situation by checking that we are looking at a part of the + // deployment representing a new task (i.e., it has the most recent version + // timestamp while other timestamps exist as well). If that's the case, we + // err on the side of caution and mark it as non-ready. + versions := map[string]bool{} + var maxVersion string + for _, task := range maraApp.Tasks { + versions[task.Version] = true + if maxVersion == "" || maxVersion < task.Version { + maxVersion = task.Version + } + } + if len(versions) > 1 && task.Version == maxVersion { + log.Printf("task %s app %s: ready = false [new task with version %s not included in readiness check results yet]", task.Id, maraApp.Id, maxVersion) + return false + } + + // Finally, we can be certain this task is not part of the deployment (i.e., + // it's an old task that's going to transition into the TASK_KILLING and/or + // TASK_KILLED state as new tasks' readiness checks gradually turn green.) + log.Printf("task %s app %s: ready = true [task not involved in deployment]", task.Id, maraApp.Id) + return true } diff --git a/services/marathon/marathon_test.go b/services/marathon/marathon_test.go index b29040d..cefc5fe 100644 --- a/services/marathon/marathon_test.go +++ b/services/marathon/marathon_test.go @@ -1,8 +1,14 @@ package marathon import ( - . "github.com/smartystreets/goconvey/convey" + "fmt" "testing" + + "net/http" + "net/http/httptest" + + "github.com/QubitProducts/bamboo/configuration" + . "github.com/smartystreets/goconvey/convey" ) func TestGetMesosDnsId_Simple(t *testing.T) { @@ -63,3 +69,352 @@ func TestParseHealthCheckPathMixed(t *testing.T) { }) }) } + +func TestParseJSONRequest(t *testing.T) { + tests := []struct { + user string + password string + wantBasicAuth bool + }{ + { + wantBasicAuth: false, + }, + { + user: "user", + wantBasicAuth: false, + }, + { + password: "password", + wantBasicAuth: false, + }, + { + user: "user", + password: "password", + wantBasicAuth: true, + }, + } + + for _, test := range tests { + test := test + t.Run(fmt.Sprintf("user='%s' password='%s'", test.user, test.password), func(t *testing.T) { + t.Parallel() + conf := configuration.Configuration{ + Marathon: configuration.Marathon{ + User: test.user, + Password: test.password, + }, + } + + var req *http.Request + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req = r + fmt.Fprint(w, "{}") + })) + defer ts.Close() + + var res interface{} + err := parseJSON(ts.URL, &conf, &res) + if err != nil { + t.Fatalf("parseJSON returned error: %s", err) + } + + if req.Method != http.MethodGet { + t.Errorf("got method '%s', want '%s'", req.Method, http.MethodGet) + } + + for _, hdrKey := range []string{"Accept", "Content-Type"} { + hdrValue := req.Header.Get(hdrKey) + switch { + case hdrValue == "": + t.Errorf("%s header missing", hdrKey) + case hdrValue != "application/json": + t.Errorf("got %s header value '%s', want 'application/json'", hdrKey, hdrValue) + } + } + + authHdrValue := req.Header.Get("Authorization") + if test.wantBasicAuth != (authHdrValue != "") { + t.Errorf("got Authorization header value '%s', wanted header: %t", authHdrValue, test.wantBasicAuth) + } + }) + } +} + +func TestParseJSONHandling(t *testing.T) { + tests := []struct { + desc string + handler http.Handler + shouldSucceed bool + }{ + { + desc: "request failed", + shouldSucceed: false, + }, + { + desc: "invalid JSON", + handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, "{") + }), + shouldSucceed: false, + }, + } + + for _, test := range tests { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + var endpoint string + if test.handler != nil { + ts := httptest.NewServer(test.handler) + defer ts.Close() + endpoint = ts.URL + } + + conf := configuration.Configuration{} + var res interface{} + err := parseJSON(endpoint, &conf, res) + + if test.shouldSucceed != (err == nil) { + t.Errorf("got error '%s', wanted error: %t", err, !test.shouldSucceed) + } + }) + } +} + +func TestFetchApps(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprintf(w, `{ + "apps": [ + { + "id": "/app2WithSlash", + "tasks": [ + { + "id": "task2", + "ports": [8002] + }, + { + "id": "task1", + "ports": [8001] + } + ] + }, + { + "id": "app1WithoutSlash", + "tasks": [ + { + "id": "task1", + "ports": [8001] + }, + { + "id": "task2", + "ports": [8002] + } + ] + } + ] +}`) + })) + defer ts.Close() + + // First Marathon URL is invalid to verify failover behavior. + maraConf := configuration.Marathon{ + Endpoint: fmt.Sprintf("http://127.0.0.1:4242,%s", ts.URL), + } + + apps, err := FetchApps(maraConf, &configuration.Configuration{}) + + if err != nil { + t.Fatalf("FetchApps returned error: %s", err) + } + + if len(apps) < 1 { + t.Fatal("no apps fetched") + } + assertFetchedApp(t, 1, "/app1WithoutSlash", apps[0]) + + if len(apps) < 2 { + t.Fatal("missing second app") + } + assertFetchedApp(t, 2, "/app2WithSlash", apps[1]) + + if len(apps) > 2 { + t.Fatalf("got %d apps, want 2", len(apps)) + } +} + +func TestCalculateReadiness(t *testing.T) { + tests := []struct { + desc string + task marathonTask + app marathonApp + wantReady bool + }{ + { + desc: "non-running task", + task: marathonTask{ + State: "TASK_STAGED", + }, + wantReady: false, + }, + { + desc: "no deployment running for app", + task: marathonTask{ + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{}, + }, + wantReady: true, + }, + { + desc: "no readiness checks defined for app", + task: marathonTask{ + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{}, + }, + wantReady: true, + }, + { + desc: "readiness check result negative", + task: marathonTask{ + Id: "taskId", + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{ + readinessCheckResult{ + Ready: false, + TaskID: "taskId", + }, + }, + }, + wantReady: false, + }, + { + desc: "readiness check result positive", + task: marathonTask{ + Id: "taskId", + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{ + readinessCheckResult{ + Ready: false, + TaskID: "otherTaskId", + }, + readinessCheckResult{ + Ready: true, + TaskID: "taskId", + }, + }, + }, + wantReady: true, + }, + { + desc: "ready task's readiness check result outstanding", + task: marathonTask{ + Id: "newTaskId", + State: taskStateRunning, + Version: "2017-01-15T00:00:00.000Z", + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{}, + Tasks: marathonTaskList{ + marathonTask{ + Id: "newTaskId", + Version: "2017-01-15T00:00:00.000Z", + }, + marathonTask{ + Id: "oldTaskId", + Version: "2017-01-01T00:00:00.000Z", + }, + }, + }, + wantReady: false, + }, + { + desc: "task not involved in deployment", + task: marathonTask{ + Id: "oldTaskId", + State: taskStateRunning, + Version: "2017-01-01T00:00:00.000Z", + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{}, + Tasks: marathonTaskList{ + marathonTask{ + Id: "newTaskId", + Version: "2017-01-15T00:00:00.000Z", + }, + marathonTask{ + Id: "oldTaskId", + Version: "2017-01-01T00:00:00.000Z", + }, + }, + }, + wantReady: true, + }, + } + + for _, test := range tests { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + gotReady := calculateReadiness(test.task, test.app) + if gotReady != test.wantReady { + t.Errorf("got ready = %t, want ready = %t", gotReady, test.wantReady) + } + }) + } +} + +func assertFetchedApp(t *testing.T, index int, id string, app App) { + if app.Id != id { + t.Errorf("app #%d: got app ID '%s', want '%s'", index, app.Id, id) + } + switch { + case len(app.Tasks) != 2: + t.Errorf("app #%d: got %d tasks, want 2", index, len(app.Tasks)) + case app.Tasks[0].Id != "task1": + t.Errorf("app #%d: got ID '%s' for task #1, want 'task1", index, app.Tasks[0].Id) + case app.Tasks[1].Id != "task2": + t.Errorf("app #%d: got ID '%s' for task #2, want 'task2", index, app.Tasks[1].Id) + } +}