12
12
//! Persistent metadata storage for the coordinator.
13
13
14
14
use std:: borrow:: Cow ;
15
- use std:: collections:: { BTreeMap , BTreeSet } ;
15
+ use std:: collections:: { BTreeMap , BTreeSet , VecDeque } ;
16
16
use std:: convert;
17
17
use std:: sync:: Arc ;
18
18
19
- use futures:: Future ;
19
+ use futures:: future:: BoxFuture ;
20
+ use futures:: { Future , FutureExt } ;
20
21
use itertools:: Itertools ;
21
22
use mz_adapter_types:: connection:: ConnectionId ;
22
23
use mz_audit_log:: { EventType , FullNameV1 , ObjectType , VersionedStorageUsage } ;
@@ -29,6 +30,7 @@ use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Conf
29
30
#[ cfg( test) ]
30
31
use mz_catalog:: durable:: CatalogError ;
31
32
use mz_catalog:: durable:: { test_bootstrap_args, DurableCatalogState , TestCatalogStateBuilder } ;
33
+ use mz_catalog:: expr_cache:: { ExpressionCacheHandle , GlobalExpressions , LocalExpressions } ;
32
34
use mz_catalog:: memory:: error:: { Error , ErrorKind } ;
33
35
use mz_catalog:: memory:: objects:: {
34
36
CatalogEntry , Cluster , ClusterReplica , Database , NetworkPolicy , Role , Schema ,
@@ -37,6 +39,7 @@ use mz_compute_types::dataflows::DataflowDescription;
37
39
use mz_controller:: clusters:: ReplicaLocation ;
38
40
use mz_controller_types:: { ClusterId , ReplicaId } ;
39
41
use mz_expr:: OptimizedMirRelationExpr ;
42
+ use mz_ore:: collections:: HashSet ;
40
43
use mz_ore:: metrics:: MetricsRegistry ;
41
44
use mz_ore:: now:: { EpochMillis , NowFn , SYSTEM_TIME } ;
42
45
use mz_ore:: option:: FallibleMapExt ;
@@ -52,9 +55,9 @@ use mz_repr::{Diff, GlobalId, ScalarType};
52
55
use mz_secrets:: InMemorySecretsController ;
53
56
use mz_sql:: catalog:: {
54
57
CatalogCluster , CatalogClusterReplica , CatalogDatabase , CatalogError as SqlCatalogError ,
55
- CatalogItem as SqlCatalogItem , CatalogItemType as SqlCatalogItemType , CatalogNetworkPolicy ,
56
- CatalogRole , CatalogSchema , DefaultPrivilegeAclItem , DefaultPrivilegeObject , EnvironmentId ,
57
- SessionCatalog , SystemObjectType ,
58
+ CatalogItem as SqlCatalogItem , CatalogItemType as SqlCatalogItemType , CatalogItemType ,
59
+ CatalogNetworkPolicy , CatalogRole , CatalogSchema , DefaultPrivilegeAclItem ,
60
+ DefaultPrivilegeObject , EnvironmentId , SessionCatalog , SystemObjectType ,
58
61
} ;
59
62
use mz_sql:: names:: {
60
63
CommentObjectId , DatabaseId , FullItemName , FullSchemaName , ItemQualifiers , ObjectId ,
@@ -128,6 +131,7 @@ mod transact;
128
131
pub struct Catalog {
129
132
state : CatalogState ,
130
133
plans : CatalogPlans ,
134
+ expr_cache_handle : Option < ExpressionCacheHandle > ,
131
135
storage : Arc < tokio:: sync:: Mutex < Box < dyn mz_catalog:: durable:: DurableCatalogState > > > ,
132
136
transient_revision : u64 ,
133
137
}
@@ -139,6 +143,7 @@ impl Clone for Catalog {
139
143
Self {
140
144
state : self . state . clone ( ) ,
141
145
plans : self . plans . clone ( ) ,
146
+ expr_cache_handle : self . expr_cache_handle . clone ( ) ,
142
147
storage : Arc :: clone ( & self . storage ) ,
143
148
transient_revision : self . transient_revision ,
144
149
}
@@ -363,6 +368,49 @@ impl Catalog {
363
368
}
364
369
policies
365
370
}
371
+
372
+ /// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
373
+ /// as a result of creating new indexes on the items in `ons`.
374
+ pub ( crate ) fn invalidate_for_index (
375
+ & self ,
376
+ ons : impl Iterator < Item = GlobalId > ,
377
+ ) -> BTreeSet < GlobalId > {
378
+ let mut dependencies = BTreeSet :: new ( ) ;
379
+ let mut queue = VecDeque :: new ( ) ;
380
+ let mut seen = HashSet :: new ( ) ;
381
+ for on in ons {
382
+ dependencies. insert ( on) ;
383
+ seen. insert ( on) ;
384
+ let entry = self . get_entry ( & on) ;
385
+ let uses = entry. uses ( ) ;
386
+ queue. extend ( uses. clone ( ) ) ;
387
+ seen. extend ( uses) ;
388
+ }
389
+
390
+ while let Some ( cur) = queue. pop_front ( ) {
391
+ if seen. insert ( cur) {
392
+ let entry = self . get_entry ( & cur) ;
393
+ match entry. item_type ( ) {
394
+ CatalogItemType :: Table
395
+ | CatalogItemType :: Source
396
+ | CatalogItemType :: MaterializedView
397
+ | CatalogItemType :: Sink
398
+ | CatalogItemType :: Index
399
+ | CatalogItemType :: Type
400
+ | CatalogItemType :: Func
401
+ | CatalogItemType :: Secret
402
+ | CatalogItemType :: Connection
403
+ | CatalogItemType :: ContinualTask => {
404
+ dependencies. insert ( cur) ;
405
+ }
406
+ CatalogItemType :: View => {
407
+ queue. extend ( entry. uses ( ) ) ;
408
+ }
409
+ }
410
+ }
411
+ }
412
+ dependencies
413
+ }
366
414
}
367
415
368
416
#[ derive( Debug ) ]
@@ -480,15 +528,21 @@ impl Catalog {
480
528
) -> Result < Catalog , anyhow:: Error > {
481
529
let now = SYSTEM_TIME . clone ( ) ;
482
530
let environment_id = None ;
483
- let openable_storage = TestCatalogStateBuilder :: new ( persist_client)
531
+ let openable_storage = TestCatalogStateBuilder :: new ( persist_client. clone ( ) )
484
532
. with_organization_id ( organization_id)
485
533
. with_default_deploy_generation ( )
486
534
. build ( )
487
535
. await ?;
488
536
let storage = openable_storage. open ( now ( ) , & test_bootstrap_args ( ) ) . await ?;
489
537
let system_parameter_defaults = BTreeMap :: default ( ) ;
490
- Self :: open_debug_catalog_inner ( storage, now, environment_id, system_parameter_defaults)
491
- . await
538
+ Self :: open_debug_catalog_inner (
539
+ persist_client,
540
+ storage,
541
+ now,
542
+ environment_id,
543
+ system_parameter_defaults,
544
+ )
545
+ . await
492
546
}
493
547
494
548
/// Opens a read only debug persist backed catalog defined by `persist_client` and
@@ -501,16 +555,22 @@ impl Catalog {
501
555
) -> Result < Catalog , anyhow:: Error > {
502
556
let now = SYSTEM_TIME . clone ( ) ;
503
557
let environment_id = None ;
504
- let openable_storage = TestCatalogStateBuilder :: new ( persist_client)
558
+ let openable_storage = TestCatalogStateBuilder :: new ( persist_client. clone ( ) )
505
559
. with_organization_id ( organization_id)
506
560
. build ( )
507
561
. await ?;
508
562
let storage = openable_storage
509
563
. open_read_only ( & test_bootstrap_args ( ) )
510
564
. await ?;
511
565
let system_parameter_defaults = BTreeMap :: default ( ) ;
512
- Self :: open_debug_catalog_inner ( storage, now, environment_id, system_parameter_defaults)
513
- . await
566
+ Self :: open_debug_catalog_inner (
567
+ persist_client,
568
+ storage,
569
+ now,
570
+ environment_id,
571
+ system_parameter_defaults,
572
+ )
573
+ . await
514
574
}
515
575
516
576
/// Opens a read only debug persist backed catalog defined by `persist_client` and
@@ -524,7 +584,7 @@ impl Catalog {
524
584
system_parameter_defaults : BTreeMap < String , String > ,
525
585
version : semver:: Version ,
526
586
) -> Result < Catalog , anyhow:: Error > {
527
- let openable_storage = TestCatalogStateBuilder :: new ( persist_client)
587
+ let openable_storage = TestCatalogStateBuilder :: new ( persist_client. clone ( ) )
528
588
. with_organization_id ( environment_id. organization_id ( ) )
529
589
. with_version ( version)
530
590
. build ( )
@@ -533,6 +593,7 @@ impl Catalog {
533
593
. open_read_only ( & test_bootstrap_args ( ) )
534
594
. await ?;
535
595
Self :: open_debug_catalog_inner (
596
+ persist_client,
536
597
storage,
537
598
now,
538
599
Some ( environment_id) ,
@@ -542,6 +603,7 @@ impl Catalog {
542
603
}
543
604
544
605
async fn open_debug_catalog_inner (
606
+ persist_client : PersistClient ,
545
607
storage : Box < dyn DurableCatalogState > ,
546
608
now : NowFn ,
547
609
environment_id : Option < EnvironmentId > ,
@@ -559,6 +621,8 @@ impl Catalog {
559
621
migrated_storage_collections_0dt : _,
560
622
new_builtins : _,
561
623
builtin_table_updates : _,
624
+ cached_global_exprs : _,
625
+ uncached_local_exprs : _,
562
626
} = Catalog :: open ( Config {
563
627
storage,
564
628
metrics_registry,
@@ -567,6 +631,8 @@ impl Catalog {
567
631
all_features : false ,
568
632
build_info : & DUMMY_BUILD_INFO ,
569
633
environment_id : environment_id. unwrap_or ( EnvironmentId :: for_tests ( ) ) ,
634
+ deploy_generation : 0 ,
635
+ read_only : false ,
570
636
now,
571
637
boot_ts : previous_ts,
572
638
skip_migrations : true ,
@@ -586,6 +652,7 @@ impl Catalog {
586
652
connection_context : ConnectionContext :: for_tests ( secrets_reader) ,
587
653
active_connection_count,
588
654
builtin_item_migration_config : BuiltinItemMigrationConfig :: Legacy ,
655
+ persist_client,
589
656
helm_chart_version : None ,
590
657
} ,
591
658
} )
@@ -1260,6 +1327,31 @@ impl Catalog {
1260
1327
. deserialize_plan_with_enable_for_item_parsing ( create_sql, force_if_exists_skip)
1261
1328
}
1262
1329
1330
+ pub ( crate ) fn update_expression_cache < ' a , ' b > (
1331
+ & ' a self ,
1332
+ new_local_expressions : Vec < ( GlobalId , LocalExpressions ) > ,
1333
+ new_global_expressions : Vec < ( GlobalId , GlobalExpressions ) > ,
1334
+ ) -> BoxFuture < ' b , ( ) > {
1335
+ if let Some ( expr_cache) = & self . expr_cache_handle {
1336
+ let ons = new_local_expressions
1337
+ . iter ( )
1338
+ . map ( |( id, _) | id)
1339
+ . chain ( new_global_expressions. iter ( ) . map ( |( id, _) | id) )
1340
+ . filter_map ( |id| self . get_entry ( id) . index ( ) )
1341
+ . map ( |index| index. on ) ;
1342
+ let invalidate_ids = self . invalidate_for_index ( ons) ;
1343
+ expr_cache
1344
+ . update (
1345
+ new_local_expressions,
1346
+ new_global_expressions,
1347
+ invalidate_ids,
1348
+ )
1349
+ . boxed ( )
1350
+ } else {
1351
+ async { } . boxed ( )
1352
+ }
1353
+ }
1354
+
1263
1355
/// Listen for and apply all unconsumed updates to the durable catalog state.
1264
1356
// TODO(jkosh44) When this method is actually used outside of a test we can remove the
1265
1357
// `#[cfg(test)]` annotation.
@@ -2060,6 +2152,7 @@ mod tests {
2060
2152
use mz_sql:: session:: user:: MZ_SYSTEM_ROLE_ID ;
2061
2153
use mz_sql:: session:: vars:: VarInput ;
2062
2154
2155
+ use crate :: catalog:: state:: LocalExpressionCache ;
2063
2156
use crate :: catalog:: { Catalog , Op } ;
2064
2157
use crate :: optimize:: dataflows:: { prep_scalar_expr, EvalTime , ExprPrepStyle } ;
2065
2158
use crate :: session:: Session ;
@@ -2378,7 +2471,7 @@ mod tests {
2378
2471
. expect ( "unable to open debug catalog" ) ;
2379
2472
let item = catalog
2380
2473
. state ( )
2381
- . deserialize_item ( id, & create_sql)
2474
+ . deserialize_item ( id, & create_sql, & mut LocalExpressionCache :: Closed )
2382
2475
. expect ( "unable to parse view" ) ;
2383
2476
catalog
2384
2477
. transact (
@@ -3274,7 +3367,7 @@ mod tests {
3274
3367
. state ( )
3275
3368
. deserialize_item ( mv_id, & format ! (
3276
3369
"CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"
3277
- ) )
3370
+ ) , & mut LocalExpressionCache :: Closed )
3278
3371
. expect ( "unable to deserialize item" ) ;
3279
3372
catalog
3280
3373
. transact (
0 commit comments