diff --git a/src/txacme/challenges/__init__.py b/src/txacme/challenges/__init__.py index 61bb835..af6cb85 100644 --- a/src/txacme/challenges/__init__.py +++ b/src/txacme/challenges/__init__.py @@ -1,3 +1,4 @@ +from ._http import HTTP01Responder from ._tls import TLSSNI01Responder -__all__ = ['TLSSNI01Responder'] +__all__ = ['HTTP01Responder', 'TLSSNI01Responder'] diff --git a/src/txacme/challenges/_http.py b/src/txacme/challenges/_http.py new file mode 100644 index 0000000..70f4046 --- /dev/null +++ b/src/txacme/challenges/_http.py @@ -0,0 +1,39 @@ +""" +``http-01`` challenge implementation. +""" +from twisted.web.resource import Resource +from twisted.web.static import Data + +from zope.interface import implementer + +from txacme.interfaces import IResponder + + +@implementer(IResponder) +class HTTP01Responder(object): + """ + An ``http-01`` challenge responder for txsni. + """ + challenge_type = u'http-01' + + def __init__(self): + self.resource = Resource() + + def start_responding(self, server_name, challenge, response): + """ + Add the child resource. + """ + self.resource.putChild( + challenge.encode('token').encode('utf-8'), + Data(response.key_authorization.encode('utf-8'), 'text/plain')) + + def stop_responding(self, server_name, challenge, response): + """ + Remove the child resource. + """ + encoded_token = challenge.encode('token').encode('utf-8') + if self.resource.getStaticEntity(encoded_token) is not None: + self.resource.delEntity(encoded_token) + + +__all__ = ['HTTP01Responder'] diff --git a/src/txacme/newsfragments/65.feature b/src/txacme/newsfragments/65.feature new file mode 100644 index 0000000..4e7f84c --- /dev/null +++ b/src/txacme/newsfragments/65.feature @@ -0,0 +1 @@ +``txacme.challenges.HTTP01Responder``, an http-01 challenge responder that can be embedded into an existing twisted.web application. diff --git a/src/txacme/test/test_challenges.py b/src/txacme/test/test_challenges.py index a73a8ba..b5afd3f 100644 --- a/src/txacme/test/test_challenges.py +++ b/src/txacme/test/test_challenges.py @@ -1,15 +1,23 @@ """ Tests for `txacme.challenges`. """ +from operator import methodcaller + from acme import challenges from acme.jose import b64encode from hypothesis import strategies as s from hypothesis import example, given from testtools import TestCase -from testtools.matchers import Contains, Equals, Is, MatchesPredicate, Not +from testtools.matchers import ( + AfterPreprocessing, Contains, Equals, Is, MatchesAll, MatchesPredicate, + MatchesStructure, Not) +from testtools.twistedsupport import succeeded +from treq.testing import StubTreq +from twisted.python.url import URL +from twisted.web.resource import Resource from zope.interface.verify import verifyObject -from txacme.challenges import TLSSNI01Responder +from txacme.challenges import HTTP01Responder, TLSSNI01Responder from txacme.challenges._tls import _MergingMappingProxy from txacme.interfaces import IResponder from txacme.test.test_client import RSA_KEY_512, RSA_KEY_512_RAW @@ -172,4 +180,57 @@ def test_contains(self, underlay, overlay, key): Equals(proxy.get(key) is not None)) -__all__ = ['TLSResponderTests'] +class HTTPResponderTests(_CommonResponderTests, TestCase): + """ + `.HTTP01Responder` is a responder for http-01 challenges. + """ + _challenge_factory = challenges.HTTP01 + _responder_factory = HTTP01Responder + _challenge_type = u'http-01' + + @example(token=b'BWYcfxzmOha7-7LoxziqPZIUr99BCz3BfbN9kzSFnrU') + @given(token=s.binary(min_size=32, max_size=32).map(b64encode)) + def test_start_responding(self, token): + """ + Calling ``start_responding`` makes an appropriate resource available. + """ + challenge = challenges.HTTP01(token=token) + response = challenge.response(RSA_KEY_512) + + responder = HTTP01Responder() + + challenge_resource = Resource() + challenge_resource.putChild(b'acme-challenge', responder.resource) + root = Resource() + root.putChild(b'.well-known', challenge_resource) + client = StubTreq(root) + + encoded_token = challenge.encode('token') + challenge_url = URL(host=u'example.com', path=[ + u'.well-known', u'acme-challenge', encoded_token]).asText() + + self.assertThat(client.get(challenge_url), + succeeded(MatchesStructure(code=Equals(404)))) + + responder.start_responding(u'example.com', challenge, response) + self.assertThat(client.get(challenge_url), succeeded(MatchesAll( + MatchesStructure( + code=Equals(200), + headers=AfterPreprocessing( + methodcaller('getRawHeaders', b'content-type'), + Equals([b'text/plain']))), + AfterPreprocessing(methodcaller('content'), succeeded( + Equals(response.key_authorization.encode('utf-8')))) + ))) + + # Starting twice before stopping doesn't break things + responder.start_responding(u'example.com', challenge, response) + self.assertThat(client.get(challenge_url), + succeeded(MatchesStructure(code=Equals(200)))) + + responder.stop_responding(u'example.com', challenge, response) + self.assertThat(client.get(challenge_url), + succeeded(MatchesStructure(code=Equals(404)))) + + +__all__ = ['HTTPResponderTests', 'TLSResponderTests']