diff --git a/storm/src/main/storm/mesos/MesosNimbus.java b/storm/src/main/storm/mesos/MesosNimbus.java index edabaa386..bd755ed88 100644 --- a/storm/src/main/storm/mesos/MesosNimbus.java +++ b/storm/src/main/storm/mesos/MesosNimbus.java @@ -43,11 +43,11 @@ import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskInfo; +import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; import org.apache.mesos.Protos.Value.Range; import org.apache.mesos.Protos.Value.Ranges; import org.apache.mesos.Protos.Value.Scalar; -import org.apache.mesos.Protos.TaskState; import org.apache.mesos.SchedulerDriver; import org.json.simple.JSONValue; import org.slf4j.Logger; @@ -60,8 +60,8 @@ import storm.mesos.resources.ResourceEntry; import storm.mesos.resources.ResourceNotAvailableException; import storm.mesos.resources.ResourceType; -import storm.mesos.schedulers.StormSchedulerImpl; import storm.mesos.schedulers.IMesosStormScheduler; +import storm.mesos.schedulers.StormSchedulerImpl; import storm.mesos.shims.CommandLineShimFactory; import storm.mesos.shims.ICommandLineShim; import storm.mesos.shims.LocalStateShim; @@ -91,8 +91,8 @@ import java.util.TimerTask; import static storm.mesos.util.PrettyProtobuf.offerIDListToString; -import static storm.mesos.util.PrettyProtobuf.offerToString; import static storm.mesos.util.PrettyProtobuf.offerMapToString; +import static storm.mesos.util.PrettyProtobuf.offerToString; import static storm.mesos.util.PrettyProtobuf.taskInfoListToString; import static storm.mesos.util.PrettyProtobuf.taskStatusListToTaskIDsString; @@ -222,22 +222,9 @@ void initializeMesosStormConf(Map conf, String localDir) { _disallowedHosts = listIntoSet((List) conf.get(CONF_MESOS_DISALLOWED_HOSTS)); _enabledLogviewerSidecar = MesosCommon.enabledLogviewerSidecar(conf); - if (_enabledLogviewerSidecar) { - Set zkServerSet = listIntoSet((List) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); - String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); - _logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; - LOG.info("Logviewer information will be stored under {}", _logviewerZkDir); - - if (zkPort == null || zkServerSet == null) { - throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); - } else { - List zkConnectionList = new ArrayList<>(); - for (String server : zkServerSet) { - zkConnectionList.add(String.format("%s:%s", server, zkPort)); - } - _zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); - } - } + initializeZkClient(conf); + _logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; + LOG.info("Logviewer ZK path: {}", _logviewerZkDir); Boolean preferReservedResources = (Boolean) conf.get(CONF_MESOS_PREFER_RESERVED_RESOURCES); if (preferReservedResources != null) { @@ -245,7 +232,7 @@ void initializeMesosStormConf(Map conf, String localDir) { } _container = Optional.fromNullable((String) conf.get(CONF_MESOS_CONTAINER_DOCKER_IMAGE)); - _mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir); + _mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir, _enabledLogviewerSidecar); // Generate YAML to be served up to clients _generatedConfPath = Paths.get( @@ -272,6 +259,21 @@ void initializeMesosStormConf(Map conf, String localDir) { } } + private void initializeZkClient(Map conf) { + Set zkServerSet = listIntoSet((List) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); + String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); + + if (zkPort == null || zkServerSet == null) { + throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); + } else { + List zkConnectionList = new ArrayList<>(); + for (String server : zkServerSet) { + zkConnectionList.add(String.format("%s:%s", server, zkPort)); + } + _zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); + } + } + @SuppressWarnings("unchecked") protected void startLocalHttpServer() throws Exception { createLocalServerPort(); @@ -286,34 +288,32 @@ public void doRegistration(final SchedulerDriver driver, Protos.FrameworkID id) _state.put(FRAMEWORK_ID, id.getValue()); _offers = new HashMap(); - if (_enabledLogviewerSidecar) { - - _timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - // performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks - // in the framework scheduler's statusUpdate() method - List taskStatuses = new ArrayList(); - List logviewerPaths = _zkClient.getChildren(_logviewerZkDir); - if (logviewerPaths == null) { - _driver.reconcileTasks(taskStatuses); - return; - } - for (String path : logviewerPaths) { - TaskID logviewerTaskId = TaskID.newBuilder() - .setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) - .build(); - TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() - .setTaskId(logviewerTaskId) - .setState(TaskState.TASK_RUNNING) - .build(); - taskStatuses.add(logviewerTaskStatus); - } + _timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + // performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks + // in the framework scheduler's statusUpdate() method + List taskStatuses = new ArrayList(); + List logviewerPaths = _zkClient.getChildren(_logviewerZkDir); + if (logviewerPaths == null || !_enabledLogviewerSidecar) { _driver.reconcileTasks(taskStatuses); - LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); + return; } - }, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes - } + + for (String path : logviewerPaths) { + TaskID logviewerTaskId = TaskID.newBuilder() + .setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) + .build(); + TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() + .setTaskId(logviewerTaskId) + .setState(TaskState.TASK_RUNNING) + .build(); + taskStatuses.add(logviewerTaskStatus); + } + _driver.reconcileTasks(taskStatuses); + LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); + } + }, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes } public void shutdown() throws Exception { diff --git a/storm/src/main/storm/mesos/NimbusMesosScheduler.java b/storm/src/main/storm/mesos/NimbusMesosScheduler.java index 61c74a7e3..6eed505a8 100644 --- a/storm/src/main/storm/mesos/NimbusMesosScheduler.java +++ b/storm/src/main/storm/mesos/NimbusMesosScheduler.java @@ -42,12 +42,14 @@ public class NimbusMesosScheduler implements Scheduler { private ZKClient zkClient; private String logviewerZkDir; private CountDownLatch _registeredLatch = new CountDownLatch(1); + private boolean enableLogViewers; public static final Logger LOG = LoggerFactory.getLogger(MesosNimbus.class); - public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir) { + public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir, boolean enableLogViewers) { this.mesosNimbus = mesosNimbus; this.zkClient = zkClient; this.logviewerZkDir = logviewerZkDir; + this.enableLogViewers = enableLogViewers; } public void waitUntilRegistered() throws InterruptedException { @@ -127,23 +129,29 @@ private void updateLogviewerState(TaskStatus status) { } String nodeId = taskId.split("\\" + MesosCommon.MESOS_COMPONENT_ID_DELIMITER)[1]; String logviewerZKPath = String.format("%s/%s", logviewerZkDir, nodeId); + if (!enableLogViewers) { + LOG.info("Logviewers are disabled. Reaping existing logviewer task {}", taskId); + reapLogviewerTask(logviewerZKPath, status); + return; + } switch (status.getState()) { case TASK_STAGING: - checkRunningLogviewerState(logviewerZKPath); + ensureLogviewerZNodeExists(logviewerZKPath); return; case TASK_STARTING: - checkRunningLogviewerState(logviewerZKPath); + ensureLogviewerZNodeExists(logviewerZKPath); return; case TASK_RUNNING: - checkRunningLogviewerState(logviewerZKPath); + ensureLogviewerZNodeExists(logviewerZKPath); return; case TASK_LOST: // this status update can be triggered by the explicit kill and isn't terminal, do not kill again break; default: - // explicitly kill the logviewer task to ensure logviewer is terminated + // explicitly kill the logviewer task to ensure it is terminated mesosNimbus._driver.killTask(status.getTaskId()); } + // if it gets to this point it means logviewer terminated; update ZK with new logviewer state if (zkClient.nodeExists(logviewerZKPath)) { LOG.info("updateLogviewerState: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId); @@ -154,13 +162,34 @@ private void updateLogviewerState(TaskStatus status) { } } - private void checkRunningLogviewerState(String logviewerZKPath) { + private void ensureLogviewerZNodeExists(String logviewerZKPath) { if (!zkClient.nodeExists(logviewerZKPath)) { - LOG.error("checkRunningLogviewerState: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); + LOG.warn("ensureLogviewerZNodeExists: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); zkClient.createNode(logviewerZKPath); } } + private void reapLogviewerTask(String logviewerZKPath, TaskStatus status) { + String taskId = status.getTaskId().getValue(); + if (zkClient.nodeExists(logviewerZKPath)) { + LOG.info("reapLogviewerTask: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId); + zkClient.deleteNode(logviewerZKPath); + } + + switch (status.getState()) { + case TASK_FAILED: + case TASK_FINISHED: + case TASK_KILLED: + case TASK_LOST: + // terminal states + break; + default: + // explicitly kill the logviewer task to ensure it is terminated + LOG.info("reapLogviewerTask: Killing logviewer mesos task {}", logviewerZKPath, taskId); + mesosNimbus._driver.killTask(status.getTaskId()); + } + } + @Override public void frameworkMessage(SchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, byte[] data) { }