From d88d61bffb1102c6c5d8ac91ba4f956e78f2d0b2 Mon Sep 17 00:00:00 2001 From: Kevinz857 Date: Sat, 31 May 2025 18:33:21 +0800 Subject: [PATCH] feat: add switch control when watching events whether deserialization is required This PR adds an option to disable automatic deserialization in the Watch stream(). By allowing clients to opt-out of automatic deserialization when only basic JSON parsing is needed, we can significantly reduce time cost and improve event processing throughput. This is particularly important in scenarios with high event volumes or resource constraints. Key changes: - Added a 'deserialize' parameter to Watch.stream() method (defaults to True for backward compatibility) - When deserialize=False, events are only JSON parsed without model conversion - Maintains the original behavior when deserialize=True - Added test cases to verify both behaviors --- kubernetes/base/watch/watch.py | 7 +++++- kubernetes/base/watch/watch_test.py | 39 +++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 2ede8638c9..e8fe6c63e6 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -179,6 +179,7 @@ def stream(self, func, *args, **kwargs): # We want to ensure we are returning within that timeout. disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False + deserialize = kwargs.pop('deserialize', True) while True: resp = func(*args, **kwargs) try: @@ -186,7 +187,11 @@ def stream(self, func, *args, **kwargs): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log if watch_arg == "watch": - event = self.unmarshal_event(line, return_type) + if deserialize: + event = self.unmarshal_event(line, return_type) + else: + # Only do basic JSON parsing, no deserialize + event = json.loads(line) if isinstance(event, dict) \ and event['type'] == 'ERROR': obj = event['raw_object'] diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index f3880de7c1..c832f625fc 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -576,5 +576,44 @@ def test_pod_log_empty_lines(self): self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) +if __name__ == '__main__': +def test_watch_with_deserialize_param(self): + """test watch.stream() deserialize param""" + # prepare test data + test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}' + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock(return_value=[test_json + '\n']) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + # test case with deserialize=True + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=True): + self.assertEqual("ADDED", e['type']) + # Verify that the object is deserialized correctly + self.assertTrue(hasattr(e['object'], 'metadata')) + self.assertEqual("test1", e['object'].metadata.name) + self.assertEqual("1", e['object'].metadata.resource_version) + # Verify that the original object is saved + self.assertEqual(json.loads(test_json)['object'], e['raw_object']) + + # test case with deserialize=False + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=False): + self.assertEqual("ADDED", e['type']) + # The validation object remains in the original dictionary format + self.assertIsInstance(e['object'], dict) + self.assertEqual("test1", e['object']['metadata']['name']) + self.assertEqual("1", e['object']['metadata']['resourceVersion']) + + # verify the api is called twice + fake_api.get_namespaces.assert_has_calls([ + call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True) + ]) if __name__ == '__main__': unittest.main()