Skip to content

Commit

Permalink
fix: Ensure hidden users see their own solves (CTFd#1840) (CTFd#1846)
Browse files Browse the repository at this point in the history
* Closes CTFd#1839

Co-authored-by: maybe-sybr <[email protected]>
  • Loading branch information
ColdHeat and maybe-sybr authored Mar 24, 2021
1 parent d0c9207 commit 7fa9f2f
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 17 deletions.
39 changes: 22 additions & 17 deletions CTFd/api/v1/challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from flask import abort, render_template, request, url_for
from flask_restx import Namespace, Resource
from sqlalchemy import func as sa_func
from sqlalchemy import types as sa_types
from sqlalchemy.sql import and_, cast, false, true
from sqlalchemy.sql import and_, false, true

from CTFd.api.v1.helpers.request import validate_args
from CTFd.api.v1.helpers.schemas import sqlalchemy_to_pydantic
Expand Down Expand Up @@ -85,10 +84,16 @@ class ChallengeListSuccessResponse(APIListSuccessResponse):


def _build_solves_query(extra_filters=(), admin_view=False):
"""Returns queries and data that that are used for showing an account's solves.
It returns a tuple of
- SQLAlchemy query with (challenge_id, solve_count_for_challenge_id)
- Current user's solved challenge IDs
"""
# This can return None (unauth) if visibility is set to public
user = get_current_user()
# We only set a condition for matching user solves if there is a user and
# they have an account ID (user mode or in a team in teams mode)
AccountModel = get_model()
if user is not None and user.account_id is not None:
user_solved_cond = Solves.account_id == user.account_id
else:
Expand All @@ -102,24 +107,26 @@ def _build_solves_query(extra_filters=(), admin_view=False):
freeze_cond = true()
# Finally, we never count solves made by hidden or banned users/teams, even
# if we are an admin. This is to match the challenge detail API.
AccountModel = get_model()
exclude_solves_cond = and_(
AccountModel.banned == false(), AccountModel.hidden == false(),
)
# This query counts the number of solves per challenge, as well as the sum
# of correct solves made by the current user per the condition above (which
# should probably only be 0 or 1!)
solves_q = (
db.session.query(
Solves.challenge_id,
sa_func.count(Solves.challenge_id),
sa_func.sum(cast(user_solved_cond, sa_types.Integer)),
)
db.session.query(Solves.challenge_id, sa_func.count(Solves.challenge_id),)
.join(AccountModel)
.filter(*extra_filters, freeze_cond, exclude_solves_cond)
.group_by(Solves.challenge_id)
)
return solves_q
# Also gather the user's solve items which can be different from above query
# Even if we are a hidden user, we should see that we have solved the challenge
# But as a hidden user we are not included in the count
solve_ids = (
Solves.query.with_entities(Solves.challenge_id).filter(user_solved_cond).all()
)
solve_ids = {value for value, in solve_ids}
return solves_q, solve_ids


@challenges_namespace.route("")
Expand Down Expand Up @@ -181,20 +188,18 @@ def get(self, query_args):
# Admins get a shortcut to see all challenges despite pre-requisites
admin_view = is_admin() and request.args.get("view") == "admin"

solve_counts, user_solves = {}, set()
solve_counts = {}
# Build a query for to show challenge solve information. We only
# give an admin view if the request argument has been provided.
#
# NOTE: This is different behaviour to the challenge detail
# endpoint which only needs the current user to be an admin rather
# than also also having to provide `view=admin` as a query arg.
solves_q = _build_solves_query(admin_view=admin_view)
solves_q, user_solves = _build_solves_query(admin_view=admin_view)
# Aggregate the query results into the hashes defined at the top of
# this block for later use
for chal_id, solve_count, solved_by_user in solves_q:
for chal_id, solve_count in solves_q:
solve_counts[chal_id] = solve_count
if solved_by_user:
user_solves.add(chal_id)
if scores_visible() and accounts_visible():
solve_count_dfl = 0
else:
Expand Down Expand Up @@ -435,14 +440,14 @@ def get(self, challenge_id):

response = chal_class.read(challenge=chal)

solves_q = _build_solves_query(
solves_q, user_solves = _build_solves_query(
admin_view=is_admin(), extra_filters=(Solves.challenge_id == chal.id,)
)
# If there are no solves for this challenge ID then we have 0 rows
maybe_row = solves_q.first()
if maybe_row:
_, solve_count, solved_by_user = maybe_row
solved_by_user = bool(solved_by_user)
challenge_id, solve_count = maybe_row
solved_by_user = challenge_id in user_solves
else:
solve_count, solved_by_user = 0, False

Expand Down
111 changes: 111 additions & 0 deletions tests/api/v1/test_challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,13 @@ def test_api_challenges_get_solve_count_hidden_user():
assert r.status_code == 200
chal_data = r.get_json()["data"].pop()
assert chal_data["solves"] == 0
# We expect the admin to be able to see their own solve
with login_as_user(app, "admin") as admin:
r = admin.get("/api/v1/challenges")
assert r.status_code == 200
chal_data = r.get_json()["data"].pop()
assert chal_data["solves"] == 0
assert chal_data["solved_by_me"] is True
destroy_ctfd(app)


Expand Down Expand Up @@ -398,6 +405,110 @@ def test_api_challenges_get_solve_count_banned_user():
destroy_ctfd(app)


def test_api_challenges_challenge_with_requirements():
"""Does the challenge list API show challenges with requirements met?"""
app = create_ctfd()
with app.app_context():
prereq_id = gen_challenge(app.db).id
chal_obj = gen_challenge(app.db)
chal_obj.requirements = {"prerequisites": [prereq_id]}
chal_id = chal_obj.id
# Create a new user which will solve the prerequisite
register_user(app)
# Confirm that only the prerequisite challenge is listed initially
with login_as_user(app) as client:
r = client.get("/api/v1/challenges")
assert r.status_code == 200
(chal_data,) = r.get_json()["data"]
assert chal_data["id"] == prereq_id
# Generate a solve and then confirm the second challenge is visible
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
with login_as_user(app) as client:
r = client.get("/api/v1/challenges")
assert r.status_code == 200
data = r.get_json()["data"]
assert len(data) == 2
chal_ids = {c["id"] for c in r.get_json()["data"]}
assert chal_ids == {prereq_id, chal_id}
destroy_ctfd(app)


def test_api_challenges_challenge_with_requirements_hidden_user():
"""Does the challenge list API show gated challenges to a hidden user?"""
app = create_ctfd()
with app.app_context():
prereq_id = gen_challenge(app.db).id
chal_obj = gen_challenge(app.db)
chal_obj.requirements = {"prerequisites": [prereq_id]}
chal_id = chal_obj.id
# Create a new user which will solve the prerequisite and hide them
register_user(app)
Users.query.get(2).hidden = True
app.db.session.commit()
# Confirm that only the prerequisite challenge is listed initially
with login_as_user(app) as client:
r = client.get("/api/v1/challenges")
assert r.status_code == 200
(chal_data,) = r.get_json()["data"]
assert chal_data["id"] == prereq_id
# Generate a solve and then confirm the second challenge is visible
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
with login_as_user(app) as client:
r = client.get("/api/v1/challenges")
assert r.status_code == 200
data = r.get_json()["data"]
assert len(data) == 2
chal_ids = {c["id"] for c in r.get_json()["data"]}
assert chal_ids == {prereq_id, chal_id}
destroy_ctfd(app)


def test_api_challenges_challenge_with_requirements_banned_user():
"""Does the challenge list API show gated challenges to a banned user?"""
app = create_ctfd()
with app.app_context():
prereq_id = gen_challenge(app.db).id
chal_obj = gen_challenge(app.db)
chal_obj.requirements = {"prerequisites": [prereq_id]}
# Create a new user which will solve the prerequisite and ban them
register_user(app)
Users.query.get(2).banned = True
app.db.session.commit()
# Generate a solve just in case and confirm the API 403s
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
with login_as_user(app) as client:
assert client.get("/api/v1/challenges").status_code == 403
destroy_ctfd(app)


def test_api_challenges_challenge_with_requirements_no_user():
"""Does the challenge list API show gated challenges to the public?"""
app = create_ctfd()
with app.app_context():
set_config("challenge_visibility", "public")
prereq_id = gen_challenge(app.db).id
chal_obj = gen_challenge(app.db)
chal_obj.requirements = {"prerequisites": [prereq_id]}
# Create a new user which will solve the prerequisite
register_user(app)
# Confirm that only the prerequisite challenge is listed publicly
with app.test_client() as client:
r = client.get("/api/v1/challenges")
assert r.status_code == 200
initial_data = r.get_json()["data"]
(chal_data,) = initial_data
assert chal_data["id"] == prereq_id
# Fix up the solve count for later comparison with `initial_data`
chal_data["solves"] += 1
# Generate a solve and then confirm the response is unchanged
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
with app.test_client() as client:
r = client.get("/api/v1/challenges")
assert r.status_code == 200
assert r.get_json()["data"] == initial_data
destroy_ctfd(app)


def test_api_challenges_post_admin():
"""Can a user post /api/v1/challenges if admin"""
app = create_ctfd()
Expand Down

0 comments on commit 7fa9f2f

Please sign in to comment.