Skip to content

Commit 60d6ed6

Browse files
committed
Add unit tests to cover rbac unauthorized error on kvp lookup
1 parent b88580f commit 60d6ed6

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

st2common/tests/unit/test_keyvalue_lookup.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,39 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
18+
import mock
19+
20+
from oslo_config import cfg
21+
1722
from st2tests.base import CleanDbTestCase
1823
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, FULL_USER_SCOPE
1924
from st2common.constants.keyvalue import SYSTEM_SCOPE, USER_SCOPE
25+
from st2common.constants.types import ResourceType
26+
from st2common.exceptions.rbac import ResourceAccessDeniedError
27+
from st2common.models.db.auth import UserDB
2028
from st2common.models.db.keyvalue import KeyValuePairDB
2129
from st2common.persistence.keyvalue import KeyValuePair
30+
from st2common.rbac.backends.noop import NoOpRBACUtils
31+
from st2common.rbac.types import PermissionType
2232
from st2common.services.keyvalues import KeyValueLookup, UserKeyValueLookup
33+
from st2tests import config
34+
35+
USER = "stanley"
36+
RESOURCE_UUID = "%s:%s:%s" % (
37+
ResourceType.KEY_VALUE_PAIR,
38+
FULL_USER_SCOPE,
39+
"stanley:foobar",
40+
)
2341

2442

2543
class TestKeyValueLookup(CleanDbTestCase):
44+
@classmethod
45+
def setUpClass(cls):
46+
super(TestKeyValueLookup, cls).setUpClass()
47+
config.parse_args()
48+
cfg.CONF.set_override(name="backend", override="noop", group="rbac")
49+
2650
def test_lookup_with_key_prefix(self):
2751
KeyValuePair.add_or_update(
2852
KeyValuePairDB(
@@ -171,3 +195,27 @@ def test_lookup_cast(self):
171195
self.assertEqual(str(lookup.count), "5.5")
172196
self.assertEqual(float(lookup.count), 5.5)
173197
self.assertEqual(int(lookup.count), 5)
198+
199+
@mock.patch.object(
200+
NoOpRBACUtils,
201+
"assert_user_has_resource_db_permission",
202+
mock.MagicMock(
203+
side_effect=ResourceAccessDeniedError(
204+
user_db=UserDB(name=USER),
205+
resource_api_or_db=KeyValuePairDB(uid=RESOURCE_UUID),
206+
permission_type=PermissionType.KEY_VALUE_PAIR_VIEW,
207+
)
208+
),
209+
)
210+
def test_system_kvp_lookup_unauthorized(self):
211+
secret_value = (
212+
"0055A2D9A09E1071931925933744965EEA7E23DCF59A8D1D7A3"
213+
+ "64338294916D37E83C4796283C584751750E39844E2FD97A3727DB5D553F638"
214+
)
215+
216+
k1 = KeyValuePair.add_or_update(
217+
KeyValuePairDB(name="k1", value=secret_value, secret=True)
218+
)
219+
220+
lookup = KeyValueLookup()
221+
self.assertRaises(ResourceAccessDeniedError, getattr, lookup, "k1")

st2common/tests/unit/test_util_keyvalue.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,21 @@
2727
DATASTORE_PARENT_SCOPE,
2828
DATASTORE_SCOPE_SEPARATOR,
2929
)
30+
from st2common.constants.types import ResourceType
3031
from st2common.exceptions.rbac import AccessDeniedError
32+
from st2common.exceptions.rbac import ResourceAccessDeniedError
3133
from st2common.models.db import auth as auth_db
34+
from st2common.models.db.keyvalue import KeyValuePairDB
35+
from st2common.rbac.backends.noop import NoOpRBACUtils
36+
from st2common.rbac.types import PermissionType
3237
from st2tests import config
3338

3439
USER = "stanley"
40+
RESOURCE_UUID = "%s:%s:%s" % (
41+
ResourceType.KEY_VALUE_PAIR,
42+
FULL_USER_SCOPE,
43+
"stanley:foobar",
44+
)
3545

3646

3747
class TestKeyValueUtil(unittest2.TestCase):
@@ -133,3 +143,35 @@ def test_get_key(self, deseralize_key_value, KeyValuePair):
133143
def test_get_key_invalid_input(self):
134144
self.assertRaises(TypeError, kv_utl.get_key, key=1)
135145
self.assertRaises(TypeError, kv_utl.get_key, key="test", decrypt="yep")
146+
147+
@mock.patch("st2common.util.keyvalue.KeyValuePair")
148+
@mock.patch("st2common.util.keyvalue.deserialize_key_value")
149+
@mock.patch.object(
150+
NoOpRBACUtils,
151+
"assert_user_has_resource_db_permission",
152+
mock.MagicMock(
153+
side_effect=ResourceAccessDeniedError(
154+
user_db=auth_db.UserDB(name=USER),
155+
resource_api_or_db=KeyValuePairDB(uid=RESOURCE_UUID),
156+
permission_type=PermissionType.KEY_VALUE_PAIR_VIEW,
157+
)
158+
),
159+
)
160+
def test_get_key_unauthorized(self, deseralize_key_value, KeyValuePair):
161+
key, value = ("foobar", "fubar")
162+
decrypt = False
163+
164+
KeyValuePair.get_by_scope_and_name().value = value
165+
deseralize_key_value.return_value = value
166+
167+
self.assertRaises(
168+
ResourceAccessDeniedError,
169+
kv_utl.get_key,
170+
key=key,
171+
user_db=auth_db.UserDB(name=USER),
172+
decrypt=decrypt,
173+
)
174+
175+
KeyValuePair.get_by_scope_and_name.assert_called_with(
176+
FULL_USER_SCOPE, "stanley:%s" % key
177+
)

0 commit comments

Comments
 (0)