Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
[pytest]
addopts = --strict-markers -vvl --cov=sea --cov-report=term-missing --cov-fail-under=85
markers =
asyncio: mark a test as an asyncio test.
2 changes: 1 addition & 1 deletion sea/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def middlewares(self):
return rv

def _register_servicer(self, servicer):
"""register serviser
"""register servicer

:param servicer: servicer
"""
Expand Down
19 changes: 16 additions & 3 deletions sea/cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,32 @@
"--worker_mode",
required=False,
action="store",
help="Worker mode. threading|multiprocessing",
help="Worker mode. threading|multiprocessing|asycio",
)
def server(worker_mode):
worker_mode = worker_mode or current_app.config["GRPC_WORKER_MODE"]
if worker_mode == "threading":
from sea.server.threading import Server

s = Server(current_app)
else:
s.run()
elif worker_mode == "multiprocessing":
from sea.server.multiprocessing import Server

s = Server(current_app)
s.run()
s.run()
else:
import asyncio
from sea.server.asyncio import Server

s = Server(current_app)
# asyncio.run(s.run())
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(s.run())
finally:
loop.run_until_complete(s._stop_handler())
loop.close()
return 0


Expand Down
61 changes: 61 additions & 0 deletions sea/server/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import signal
import asyncio
from concurrent import futures

import grpc
from grpc_reflection.v1alpha import reflection

from sea import signals


class Server:
"""sea server implements

:param app: application instance
"""

def __init__(self, app):
self.app = app
self.workers = self.app.config["GRPC_WORKERS"]
self.host = self.app.config["GRPC_HOST"]
self.port = self.app.config["GRPC_PORT"]
self.server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=self.workers))
self.server.add_insecure_port("{}:{}".format(self.host, self.port))
self._stopped = False

async def run(self):
self.app.logger.warning("Starting server...")
# run prometheus client
if self.app.config["PROMETHEUS_SCRAPE"]:
from prometheus_client import start_http_server

self.app.logger.warning(f'Starting prometheus client...{self.app.config["PROMETHEUS_PORT"]}')
start_http_server(self.app.config["PROMETHEUS_PORT"])
# register reflection service
if self.app.config.get("GRPC_REFLECTION_SERVICES"):
reflection.enable_server_reflection((reflection.SERVICE_NAME, *self.app.config["GRPC_REFLECTION_SERVICES"]), self.server)
# run grpc server
for _, (add_func, servicer) in self.app.servicers.items():
add_func(servicer(), self.server)
await self.server.start()
signals.server_started.send(self)
self.register_signal()

await self.server.wait_for_termination()
# while not self._stopped:
# await asyncio.sleep(1)
# signals.server_stopped.send(self)
# return True

def register_signal(self):
signal.signal(signal.SIGINT, self._stop_handler)
signal.signal(signal.SIGHUP, self._stop_handler)
signal.signal(signal.SIGTERM, self._stop_handler)
signal.signal(signal.SIGQUIT, self._stop_handler)

async def _stop_handler(self):
self.app.logger.warning("Stopping server...")
grace = self.app.config["GRPC_GRACE"]
await self.server.stop(grace)
await asyncio.sleep(grace or 1)
self._stopped = True
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ prometheus_client
versioneer
pre-commit
gitlint
pytest-asyncio
36 changes: 36 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import threading
import time
from unittest import mock
import pytest
import asyncio

from sea.signals import server_started, server_stopped

Expand Down Expand Up @@ -75,3 +77,37 @@ def kill_later(sec):
assert "stopped!" in content
finally:
os.rmdir("/tmp/prometheus_metrics")


@pytest.mark.asyncio
async def test_asyncio_server(app, logstream):
from sea.server.asyncio import Server

s = Server(app)
assert not s._stopped

def log_started(s):
app.logger.warning("started!")

def log_stopped(s):
app.logger.warning("stopped!")

server_started.connect(log_started)
server_stopped.connect(log_stopped)

async def stop_server_later(sec):
await asyncio.sleep(sec)
await s._stop_handler()
# server_stopped.send(s)

# Run the server and stop it after 3 seconds in parallel
await asyncio.gather(s.run(), stop_server_later(3))

# asyncio.create_task(stop_server_later(3))

# await s.run()
assert s._stopped

content = logstream.getvalue()
assert "started!" in content
# assert "started!" in content and "stopped!" in content