Skip to content

Commit

Permalink
Server added, signaling, stats. Work in progress
Browse files Browse the repository at this point in the history
WIP Fix typos


Work in progress aligning nxsugar-py to nxsugar-go 2
  • Loading branch information
rogerzr committed Sep 22, 2016
1 parent 48af722 commit 6d364af
Show file tree
Hide file tree
Showing 8 changed files with 470 additions and 181 deletions.
3 changes: 3 additions & 0 deletions nxsugarpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@
##############################################################################

from .service import *
from .server import *
from .errors import *
from .log import *
10 changes: 7 additions & 3 deletions nxsugarpy/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@
}

def newJsonRpcErr(code, message, data):
return formatAsJsonRpcErr({"code": code, "message": message, "data": data})
return {"code": code, "message": message, "data": data}

def formatAsJsonRpcErr(err):
if not isinstance(err, dict):
return {"code": 0, "message": "", "data": None}
try:
msg = str(err)
return {"code": 0, "message": msg, "data": None}
except:
return {"code": 0, "message": "", "data": err}
code = 0
if "code" in err:
code = err["code"]
Expand All @@ -95,5 +99,5 @@ def errToStr(err):
return "[{0}] {1}".format(code, message)

def isNexusErrCode(err, code):
formatAsJsonRpcErr(err)
err = formatAsJsonRpcErr(err)
return err["code"] == code
57 changes: 1 addition & 56 deletions nxsugarpy/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#
##############################################################################

import threading

def secondsToStr(secs):
out = ""
if secs < 1:
Expand All @@ -43,57 +41,4 @@ def secondsToStr(secs):
out = "{0}{1:.0f}m".format(out, rmins)
if rsecs > 0:
out = "{0}{1:.0f}s".format(out, rsecs)
return out


class Stats(object):
def __init__(self):
self._lock = threading.Lock()
self.taskPullsDone = 0
self.taskPullTimeouts = 0
self.tasksPulled = 0
self.tasksPanic = 0
self.tasksServed = 0
self.tasksMethodNotFound = 0
self.tasksRunning = 0
self.threadsUsed = 0

def addTaskPullsDone(self, n):
self._lock.acquire()
self.taskPullsDone += n
self._lock.release()

def addTaskPullsTimeouts(self, n):
self._lock.acquire()
self.taskPullTimeouts += n
self._lock.release()

def addTasksPulled(self, n):
self._lock.acquire()
self.tasksPulled += n
self._lock.release()

def addTaskPanic(self, n):
self._lock.acquire()
self.tasksPanic += n
self._lock.release()

def addTasksServed(self, n):
self._lock.acquire()
self.tasksServed += n
self._lock.release()

def addTasksMethodNotFound(self, n):
self._lock.acquire()
self.tasksMethodNotFound += n
self._lock.release()

def addTasksRunning(self, n):
self._lock.acquire()
self.tasksRunning += n
self._lock.release()

def addThreadsUsed(self, n):
self._lock.acquire()
self.threadsUsed += n
self._lock.release()
return out
190 changes: 190 additions & 0 deletions nxsugarpy/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# nxsugarpy, a Python library for building nexus services with python
# Copyright (C) 2016 by the nxsugarpy team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################

from nxsugarpy.log import *
from nxsugarpy.service import *
from nxsugarpy.service import _populateOpts

import threading
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty

class Server(object):
def __init__(self, url):
self.url = url
self.user = ""
self.password = ""
self.pulls = 1
self.pullTimeout = 3600
self.maxThreads = 4
self.logLevel = InfoLevel
self.statsPeriod = 300
self.gracefulExit = 20
self.testing = False
self.version = "0.0.0"

self.connState = None
self._nc = None
self._services = {}
self._addedAsStoppable = False

def getConn(self):
return self._nc

def setUrl(self, url):
self.url = url

def setUser(self, user):
self.user = user

def setPassword(self, password):
self.password = password

def setLogLevel(self, l):
self.logLevel = l

def setStatsPeriod(self, t):
self.statsPeriod = t
for _, svc in self._services.items():
svc.setStatsPeriod(t)

def setGracefulExit(self, t):
self.gracefulExit = t
for _, svc in self._services.items():
svc.setGracefulExit(t)

def setVersion(self, major, minor, patch):
self.version = "{0}.{1}.{2}".format(major, minor, patch)
for _, svc in self._services.items():
svc.version = self.version

def setTesting(self, t):
if t:
self.testing = True
else:
self.testing = False
for _, svc in self._services.items():
svc.setTesting(self.testing)

def isTesting(self):
return self.testing

def addService(self, name, path, opts=None):
svc = Service(self.url, path, {"pulls": self.pulls, "pullTimeout": self.pullTimeout, "maxThreads": self.maxThreads, "testing": self.testing})
svc.name = name
svc.logLevel = self.logLevel
svc.statsPeriod = self.statsPeriod
svc.gracefulExit = self.gracefulExit
svc.version = self.version
if opts != None:
opts = _populateOpts(opts)
svc.pulls = opts["pulls"]
svc.pullTimeout = opts["pullTimeout"]
svc.maxThreads = opts["maxThreads"]
svc.testing = opts["testing"]
svc._preaction = opts["preaction"]
svc._postaction = opts["postaction"]
self._services[name] = svc
return svc

def _setState(self, state):
if self.connState != None:
self.connState(self.getConn(), state)

def serve(self):
self._setState(StateInitializing)

# Check server
if len(self._services) == 0:
errs = "no services to serve"
logWithFields(ErrorLevel, "server", {"type": "no_services"}, errs)
return errs

# Dial and login
nxurl = urlparse(self.url)
if self.user == "" and nxurl.username != None:
self.user = nxurl.username
if self.password == "" and nxurl.password != None:
self.password = nxurl.password
self.url = "{0}://{1}:{2}".format(nxurl.scheme, nxurl.hostname, nxurl.port)
for _, svc in self._services.items():
svc.user = self.user
svc.password = self.password
svc.url = self.url
connurl = "{0}://{1}:{2}@{3}:{4}".format(nxurl.scheme, self.user, self.password, nxurl.hostname, nxurl.port)


self._setState(StateConnecting)
try:
self._nc = nxpy.Client(connurl)
except Exception as e:
errs = "can't connect to nexus server ({0}): {1}".format(connurl, str(e))
logWithFields(ErrorLevel, "server", {"type": "connection_error"}, errs)
return errs
if not self._nc.is_version_compatible:
logWithFields(WarnLevel, "server", {"type": "incompatible_version"}, "connecting to an incompatible version of nexus at ({0}): client ({1}) server ({2})", self.url, nxpy.__version__, self._nc.nexus_version)
if not self._nc.is_logged:
errs = "can't login to nexus server ({0}) as ({1}): {2}".format(connurl, self.user, errToStr(self._nc.login_error))
logWithFields(ErrorLevel, "server", {"type": "login_error"}, errs)
return errs

# Configure services
for _, svc in self._services.items():
svc.setLogLevel(self.logLevel)
svc._setConn(self._nc)

# Serve
errQ = Queue(len(self._services))
serviceWorkers = []
for _, svc in self._services.items():
worker = threading.Thread(target=svc.serve, kwargs={"errQueue":errQ})
worker.daemon = True
serviceWorkers.append(worker)
worker.start()

if not self._addedAsStoppable:
addStoppable(self)
self._addedAsStoppable = True

self._setState(StateServing)

for worker in serviceWorkers:
worker.join()

self._nc = None
self._setState(StateStopped)

try:
firstErr = errQ.get_nowait()
except Empty:
return None
else:
return firstErr

def gracefulStop(self):
for _, svc in self._services.items():
svc.gracefulStop()

def stop(self):
for _, svc in self._services.items():
svc.stop()
Loading

0 comments on commit 6d364af

Please sign in to comment.