46
46
from synapse .types import (
47
47
DeviceListUpdates ,
48
48
JsonDict ,
49
+ JsonMapping ,
49
50
PersistedEventPosition ,
50
51
Requester ,
51
52
RoomStreamToken ,
@@ -357,6 +358,7 @@ def __init__(self, hs: "HomeServer"):
357
358
self .event_sources = hs .get_event_sources ()
358
359
self .relations_handler = hs .get_relations_handler ()
359
360
self .device_handler = hs .get_device_handler ()
361
+ self .push_rules_handler = hs .get_push_rules_handler ()
360
362
self .rooms_to_exclude_globally = hs .config .server .rooms_to_exclude_from_sync
361
363
362
364
async def wait_for_sync_for_user (
@@ -628,6 +630,7 @@ async def handle_room(room_id: str) -> None:
628
630
629
631
extensions = await self .get_extensions_response (
630
632
sync_config = sync_config ,
633
+ lists = lists ,
631
634
from_token = from_token ,
632
635
to_token = to_token ,
633
636
)
@@ -1797,13 +1800,15 @@ async def get_room_sync_data(
1797
1800
async def get_extensions_response (
1798
1801
self ,
1799
1802
sync_config : SlidingSyncConfig ,
1803
+ lists : Dict [str , SlidingSyncResult .SlidingWindowList ],
1800
1804
to_token : StreamToken ,
1801
1805
from_token : Optional [SlidingSyncStreamToken ],
1802
1806
) -> SlidingSyncResult .Extensions :
1803
1807
"""Handle extension requests.
1804
1808
1805
1809
Args:
1806
1810
sync_config: Sync configuration
1811
+ lists: Sliding window API. A map of list key to list results.
1807
1812
to_token: The point in the stream to sync up to.
1808
1813
from_token: The point in the stream to sync from.
1809
1814
"""
@@ -1828,9 +1833,20 @@ async def get_extensions_response(
1828
1833
from_token = from_token ,
1829
1834
)
1830
1835
1836
+ account_data_response = None
1837
+ if sync_config .extensions .account_data is not None :
1838
+ account_data_response = await self .get_account_data_extension_response (
1839
+ sync_config = sync_config ,
1840
+ lists = lists ,
1841
+ account_data_request = sync_config .extensions .account_data ,
1842
+ to_token = to_token ,
1843
+ from_token = from_token ,
1844
+ )
1845
+
1831
1846
return SlidingSyncResult .Extensions (
1832
1847
to_device = to_device_response ,
1833
1848
e2ee = e2ee_response ,
1849
+ account_data = account_data_response ,
1834
1850
)
1835
1851
1836
1852
async def get_to_device_extension_response (
@@ -1956,3 +1972,125 @@ async def get_e2ee_extension_response(
1956
1972
device_one_time_keys_count = device_one_time_keys_count ,
1957
1973
device_unused_fallback_key_types = device_unused_fallback_key_types ,
1958
1974
)
1975
+
1976
+ async def get_account_data_extension_response (
1977
+ self ,
1978
+ sync_config : SlidingSyncConfig ,
1979
+ lists : Dict [str , SlidingSyncResult .SlidingWindowList ],
1980
+ account_data_request : SlidingSyncConfig .Extensions .AccountDataExtension ,
1981
+ to_token : StreamToken ,
1982
+ from_token : Optional [SlidingSyncStreamToken ],
1983
+ ) -> Optional [SlidingSyncResult .Extensions .AccountDataExtension ]:
1984
+ """Handle Account Data extension (MSC3959)
1985
+
1986
+ Args:
1987
+ sync_config: Sync configuration
1988
+ lists: Sliding window API. A map of list key to list results.
1989
+ account_data_request: The account_data extension from the request
1990
+ to_token: The point in the stream to sync up to.
1991
+ from_token: The point in the stream to sync from.
1992
+ """
1993
+ user_id = sync_config .user .to_string ()
1994
+
1995
+ # Skip if the extension is not enabled
1996
+ if not account_data_request .enabled :
1997
+ return None
1998
+
1999
+ global_account_data_map : Mapping [str , JsonMapping ] = {}
2000
+ if from_token is not None :
2001
+ global_account_data_map = (
2002
+ await self .store .get_updated_global_account_data_for_user (
2003
+ user_id , from_token .stream_token .account_data_key
2004
+ )
2005
+ )
2006
+
2007
+ have_push_rules_changed = await self .store .have_push_rules_changed_for_user (
2008
+ user_id , from_token .stream_token .push_rules_key
2009
+ )
2010
+ if have_push_rules_changed :
2011
+ global_account_data_map = dict (global_account_data_map )
2012
+ global_account_data_map [AccountDataTypes .PUSH_RULES ] = (
2013
+ await self .push_rules_handler .push_rules_for_user (sync_config .user )
2014
+ )
2015
+ else :
2016
+ all_global_account_data = await self .store .get_global_account_data_for_user (
2017
+ user_id
2018
+ )
2019
+
2020
+ global_account_data_map = dict (all_global_account_data )
2021
+ global_account_data_map [AccountDataTypes .PUSH_RULES ] = (
2022
+ await self .push_rules_handler .push_rules_for_user (sync_config .user )
2023
+ )
2024
+
2025
+ # We only want to include account data for rooms that are already in the sliding
2026
+ # sync response AND that were requested in the account data request.
2027
+ relevant_room_ids : Set [str ] = set ()
2028
+
2029
+ # See what rooms from the room subscriptions we should get account data for
2030
+ if (
2031
+ account_data_request .rooms is not None
2032
+ and sync_config .room_subscriptions is not None
2033
+ ):
2034
+ actual_room_ids = sync_config .room_subscriptions .keys ()
2035
+
2036
+ for room_id in account_data_request .rooms :
2037
+ # A wildcard means we process all rooms from the room subscriptions
2038
+ if room_id == "*" :
2039
+ relevant_room_ids .update (sync_config .room_subscriptions .keys ())
2040
+ break
2041
+
2042
+ if room_id in actual_room_ids :
2043
+ relevant_room_ids .add (room_id )
2044
+
2045
+ # See what rooms from the sliding window lists we should get account data for
2046
+ if account_data_request .lists is not None :
2047
+ for list_key in account_data_request .lists :
2048
+ # Just some typing because we share the variable name in multiple places
2049
+ actual_list : Optional [SlidingSyncResult .SlidingWindowList ] = None
2050
+
2051
+ # A wildcard means we process rooms from all lists
2052
+ if list_key == "*" :
2053
+ for actual_list in lists .values ():
2054
+ # We only expect a single SYNC operation for any list
2055
+ assert len (actual_list .ops ) == 1
2056
+ sync_op = actual_list .ops [0 ]
2057
+ assert sync_op .op == OperationType .SYNC
2058
+
2059
+ relevant_room_ids .update (sync_op .room_ids )
2060
+
2061
+ break
2062
+
2063
+ actual_list = lists .get (list_key )
2064
+ if actual_list is not None :
2065
+ # We only expect a single SYNC operation for any list
2066
+ assert len (actual_list .ops ) == 1
2067
+ sync_op = actual_list .ops [0 ]
2068
+ assert sync_op .op == OperationType .SYNC
2069
+
2070
+ relevant_room_ids .update (sync_op .room_ids )
2071
+
2072
+ # Fetch room account data
2073
+ account_data_by_room_map : Mapping [str , Mapping [str , JsonMapping ]] = {}
2074
+ if len (relevant_room_ids ) > 0 :
2075
+ if from_token is not None :
2076
+ account_data_by_room_map = (
2077
+ await self .store .get_updated_room_account_data_for_user (
2078
+ user_id , from_token .stream_token .account_data_key
2079
+ )
2080
+ )
2081
+ else :
2082
+ account_data_by_room_map = (
2083
+ await self .store .get_room_account_data_for_user (user_id )
2084
+ )
2085
+
2086
+ # Filter down to the relevant rooms
2087
+ account_data_by_room_map = {
2088
+ room_id : account_data_map
2089
+ for room_id , account_data_map in account_data_by_room_map .items ()
2090
+ if room_id in relevant_room_ids
2091
+ }
2092
+
2093
+ return SlidingSyncResult .Extensions .AccountDataExtension (
2094
+ global_account_data_map = global_account_data_map ,
2095
+ account_data_by_room_map = account_data_by_room_map ,
2096
+ )
0 commit comments