Skip to content

chore: disallow password resets to unverified email #18088

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def pyramid_request(pyramid_services, jinja):
dummy_request.log = pretend.stub(
bind=pretend.call_recorder(lambda *args, **kwargs: dummy_request.log),
info=pretend.call_recorder(lambda *args, **kwargs: None),
warning=pretend.call_recorder(lambda *args, **kwargs: None),
error=pretend.call_recorder(lambda *args, **kwargs: None),
)

Expand Down
126 changes: 86 additions & 40 deletions tests/unit/accounts/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1845,26 +1845,30 @@ def test_get(self, pyramid_request, user_service):
]

def test_request_password_reset(
self, monkeypatch, pyramid_request, pyramid_config, user_service, token_service
self,
monkeypatch,
pyramid_request,
user_service,
token_service,
mocker,
):
stub_user = pretend.stub(
id=pretend.stub(),
username=pretend.stub(),
emails=[pretend.stub(email="[email protected]")],
can_reset_password=True,
record_event=pretend.call_recorder(lambda *a, **kw: None),
user = UserFactory.create(with_verified_primary_email=True)
mock_record_event = mocker.patch(
"warehouse.accounts.models.HasEvents.record_event",
autospec=True,
return_value=True,
)
pyramid_request.method = "POST"
token_service.dumps = pretend.call_recorder(lambda a: "TOK")
user_service.get_user_by_username = pretend.call_recorder(lambda a: stub_user)
user_service.get_user_by_username = pretend.call_recorder(lambda a: user)
pyramid_request.find_service = pretend.call_recorder(
lambda interface, **kw: {
IUserService: user_service,
ITokenService: token_service,
}[interface]
)
form_obj = pretend.stub(
username_or_email=pretend.stub(data=stub_user.username),
username_or_email=pretend.stub(data=user.username),
validate=pretend.call_recorder(lambda: True),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
Expand All @@ -1879,9 +1883,7 @@ def test_request_password_reset(
result = views.request_password_reset(pyramid_request, _form_class=form_class)

assert result == {"n_hours": n_hours}
assert user_service.get_user_by_username.calls == [
pretend.call(stub_user.username)
]
assert user_service.get_user_by_username.calls == [pretend.call(user.username)]
assert pyramid_request.find_service.calls == [
pretend.call(IUserService, context=None),
pretend.call(ITokenService, name="password"),
Expand All @@ -1891,22 +1893,21 @@ def test_request_password_reset(
pretend.call(pyramid_request.POST, user_service=user_service)
]
assert send_password_reset_email.calls == [
pretend.call(pyramid_request, (stub_user, None))
]
assert stub_user.record_event.calls == [
pretend.call(
tag=EventTag.Account.PasswordResetRequest,
request=pyramid_request,
)
pretend.call(pyramid_request, (user, user.primary_email))
]
mock_record_event.assert_called_once_with(
user,
tag=EventTag.Account.PasswordResetRequest,
request=pyramid_request,
)

def test_request_password_reset_with_email(
self, monkeypatch, pyramid_request, pyramid_config, user_service, token_service
):
stub_user = pretend.stub(
id=uuid.uuid4(),
email="[email protected]",
emails=[pretend.stub(email="[email protected]")],
emails=[pretend.stub(email="[email protected]", verified=True)],
can_reset_password=True,
record_event=pretend.call_recorder(lambda *a, **kw: None),
)
Expand Down Expand Up @@ -1977,8 +1978,8 @@ def test_request_password_reset_with_non_primary_email(
id=uuid.uuid4(),
email="[email protected]",
emails=[
pretend.stub(email="[email protected]"),
pretend.stub(email="[email protected]"),
pretend.stub(email="[email protected]", verified=True),
pretend.stub(email="[email protected]", verified=True),
],
can_reset_password=True,
record_event=pretend.call_recorder(lambda *a, **kw: None),
Expand Down Expand Up @@ -2055,7 +2056,7 @@ def test_too_many_password_reset_requests(
stub_user = pretend.stub(
id=uuid.uuid4(),
email="[email protected]",
emails=[pretend.stub(email="[email protected]")],
emails=[pretend.stub(email="[email protected]", verified=True)],
can_reset_password=True,
record_event=pretend.call_recorder(lambda *a, **kw: None),
)
Expand Down Expand Up @@ -2100,26 +2101,26 @@ def test_too_many_password_reset_requests(
pretend.call(stub_user.id)
]

def test_password_reset_prohibited(
self, monkeypatch, pyramid_request, pyramid_config, user_service
):
stub_user = pretend.stub(
id=pretend.stub(),
username=pretend.stub(),
emails=[pretend.stub(email="[email protected]")],
can_reset_password=False,
record_event=pretend.call_recorder(lambda *a, **kw: None),
def test_password_reset_prohibited(self, pyramid_request, user_service, mocker):
user = UserFactory.create(
with_verified_primary_email=True,
prohibit_password_reset=True,
)
mock_record_event = mocker.patch(
"warehouse.accounts.models.HasEvents.record_event",
autospec=True,
return_value=True,
)
pyramid_request.method = "POST"
pyramid_request.route_path = pretend.call_recorder(lambda a: "/the-redirect")
user_service.get_user_by_username = pretend.call_recorder(lambda a: stub_user)
user_service.get_user_by_username = pretend.call_recorder(lambda a: user)
pyramid_request.find_service = pretend.call_recorder(
lambda interface, **kw: {
IUserService: user_service,
}[interface]
)
form_obj = pretend.stub(
username_or_email=pretend.stub(data=stub_user.username),
username_or_email=pretend.stub(data=user.username),
validate=pretend.call_recorder(lambda: True),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
Expand All @@ -2132,12 +2133,11 @@ def test_password_reset_prohibited(
]
assert result.headers["Location"] == "/the-redirect"

assert stub_user.record_event.calls == [
pretend.call(
tag=EventTag.Account.PasswordResetAttempt,
request=pyramid_request,
)
]
mock_record_event.assert_called_once_with(
user,
tag=EventTag.Account.PasswordResetAttempt,
request=pyramid_request,
)

def test_password_reset_with_nonexistent_email(
self, monkeypatch, pyramid_request, pyramid_config, user_service, token_service
Expand Down Expand Up @@ -2169,6 +2169,52 @@ def test_redirect_authenticated_user(self):
assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

@pytest.mark.parametrize(
"user_input",
[
"email",
"username",
],
Comment on lines +2174 to +2177
Copy link
Member Author

@miketheman miketheman May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the material change to the unverified tests from the previous version - now this test takes either of these as an input.

)
def test_unverified_email_sends_alt_notice(self, db_request, mocker, user_input):
unverified_email = EmailFactory(verified=False)

form_input = {
"email": unverified_email.email,
"username": unverified_email.user.username,
}.get(user_input)

mock_send_email = mocker.patch(
"warehouse.accounts.views.send_password_reset_unverified_email",
autospec=True,
return_value=None,
)
# Prevent form's validation from checking deliverability
mock_form_validation = mocker.patch(
"warehouse.accounts.forms."
"RequestPasswordResetForm.validate_username_or_email",
autospec=True,
return_value=True,
)

db_request.method = "POST"
db_request.POST = MultiDict({"username_or_email": form_input})

result = views.request_password_reset(db_request)

assert result == {"n_hours": 6}
mock_form_validation.assert_called_once()
mock_send_email.assert_called_once_with(
db_request, (unverified_email.user, unverified_email)
)
assert db_request.log.warning.calls == [
pretend.call(
"User requested password reset for unverified email",
username=unverified_email.user.username,
email_address=unverified_email.email,
)
]


class TestResetPassword:
@pytest.mark.parametrize("dates_utc", [True, False])
Expand Down
40 changes: 31 additions & 9 deletions tests/unit/email/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,17 +535,14 @@ def retry(exc):

class TestSendPasswordResetEmail:
@pytest.mark.parametrize(
("verified", "email_addr"),
"email_addr",
[
(True, None),
(False, None),
(True, "[email protected]"),
(False, "[email protected]"),
None,
"[email protected]",
],
)
def test_send_password_reset_email(
self,
verified,
email_addr,
pyramid_request,
pyramid_config,
Expand All @@ -556,7 +553,7 @@ def test_send_password_reset_email(
stub_user = pretend.stub(
id="id",
email="[email protected]",
primary_email=pretend.stub(email="[email protected]", verified=verified),
primary_email=pretend.stub(email="[email protected]", verified=True),
username="username_value",
name="name_value",
last_login="last_login",
Expand All @@ -565,7 +562,7 @@ def test_send_password_reset_email(
if email_addr is None:
stub_email = None
else:
stub_email = pretend.stub(email=email_addr, verified=verified)
stub_email = pretend.stub(email=email_addr, verified=True)
pyramid_request.method = "POST"
token_service.dumps = pretend.call_recorder(lambda a: "TOKEN")

Expand Down Expand Up @@ -654,12 +651,37 @@ def test_send_password_reset_email(
"warehouse.emails.scheduled",
tags=[
"template_name:password-reset",
"allow_unverified:True",
"allow_unverified:False",
"repeat_window:none",
],
)
]

def test_unverified_email_sends_alt_notice(self, pyramid_config, db_request):
unverified_email = EmailFactory.create(verified=False)

subject_renderer = pyramid_config.testing_add_renderer(
"email/password-reset-unverified/subject.txt"
)
subject_renderer.string_response = "Email Subject"
body_renderer = pyramid_config.testing_add_renderer(
"email/password-reset-unverified/body.txt"
)
body_renderer.string_response = "Email Body"
html_renderer = pyramid_config.testing_add_renderer(
"email/password-reset-unverified/body.html"
)
html_renderer.string_response = "Email HTML Body"

result = email.send_password_reset_unverified_email(
db_request, (unverified_email.user, unverified_email)
)

assert result == {}
subject_renderer.assert_()
body_renderer.assert_()
html_renderer.assert_()


class TestEmailVerificationEmail:
def test_email_verification_email(
Expand Down
45 changes: 35 additions & 10 deletions warehouse/accounts/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
send_organization_member_invite_declined_email,
send_password_change_email,
send_password_reset_email,
send_password_reset_unverified_email,
send_recovery_code_reminder_email,
)
from warehouse.events.tags import EventTag
Expand Down Expand Up @@ -795,19 +796,43 @@ def request_password_reset(request, _form_class=RequestPasswordResetForm):
user_service = request.find_service(IUserService, context=None)
form = _form_class(request.POST, user_service=user_service)
if request.method == "POST" and form.validate():
user = user_service.get_user_by_username(form.username_or_email.data)
form_field_input = form.username_or_email.data

if user is None:
user = user_service.get_user_by_email(form.username_or_email.data)
if user is not None:
email = first_true(
user.emails, pred=lambda e: e.email == form.username_or_email.data
)
requested_email = None
user = user_service.get_user_by_username(form_field_input)

if user:
requested_email = user.primary_email
else:
if user := user_service.get_user_by_email(form_field_input):
requested_email = first_true(
user.emails,
pred=lambda e: e.email == form_field_input,
)
else:
# We could not find the user by username nor email.
# Return a response as if we did, to avoid leaking registered emails.
token_service = request.find_service(ITokenService, name="password")
n_hours = token_service.max_age // 60 // 60
return {"n_hours": n_hours}

if requested_email and not requested_email.verified:
# No verified email, log the attempt, ping the rate limit,
# notify to the email as to why no reset, return generic response.
send_password_reset_unverified_email(request, (user, requested_email))
user.record_event(
tag=EventTag.Account.PasswordResetAttempt,
request=request,
)
request.log.warning(
"User requested password reset for unverified email",
username=user.username,
email_address=requested_email.email,
)
user_service.ratelimiters["password.reset"].hit(user.id)

token_service = request.find_service(ITokenService, name="password")
n_hours = token_service.max_age // 60 // 60
# We could not find the user by username nor email.
# Return a response as if we did, to avoid leaking registered emails.
return {"n_hours": n_hours}

if not user_service.ratelimiters["password.reset"].test(user.id):
Expand All @@ -816,7 +841,7 @@ def request_password_reset(request, _form_class=RequestPasswordResetForm):
)

if user.can_reset_password:
send_password_reset_email(request, (user, email))
send_password_reset_email(request, (user, requested_email))
user.record_event(
tag=EventTag.Account.PasswordResetRequest,
request=request,
Expand Down
13 changes: 12 additions & 1 deletion warehouse/email/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def wrapper(request, user_or_users, **kwargs):
return inner


@_email("password-reset", allow_unverified=True)
@_email("password-reset")
def send_password_reset_email(request, user_and_email):
user, _ = user_and_email
token_service = request.find_service(ITokenService, name="password")
Expand All @@ -246,6 +246,17 @@ def send_password_reset_email(request, user_and_email):
}


@_email("password-reset-unverified", allow_unverified=True)
def send_password_reset_unverified_email(_request, _user_and_email):
"""
This email is sent to users who have not verified their email address
when they request a password reset. It is sent to the email address
they provided, which may not be their primary email address.
"""
# No params are used in the template, return an empty dict
return {}


@_email("verify-email", allow_unverified=True)
def send_email_verification_email(request, user_and_email):
user, email = user_and_email
Expand Down
Loading