@@ -625,6 +625,117 @@ def _get_downstream_features(
625625
626626 return downstream
627627
628+ def _compute_field_version_from_spec (
629+ self ,
630+ feature_key : str ,
631+ field_key : str ,
632+ field_code_version : str ,
633+ field_deps : list [dict [str , Any ]],
634+ all_specs : dict [str , dict [str , Any ]],
635+ computed_versions : dict [tuple [str , str ], str ],
636+ ) -> str :
637+ """Compute field version from snapshot spec without loading Feature classes.
638+
639+ Args:
640+ feature_key: Feature key string
641+ field_key: Field key string
642+ field_code_version: Code version of the field
643+ field_deps: Field dependencies from spec
644+ all_specs: All feature specs in the snapshot
645+ computed_versions: Cache of already computed field versions
646+
647+ Returns:
648+ Computed field version hash
649+ """
650+ import hashlib
651+
652+ # Check cache
653+ cache_key = (feature_key , field_key )
654+ if cache_key in computed_versions :
655+ return computed_versions [cache_key ]
656+
657+ # Create hasher
658+ hasher = hashlib .sha256 ()
659+
660+ # Add fully qualified field key
661+ fq_key = f"{ feature_key } /{ field_key } "
662+ hasher .update (fq_key .encode ())
663+
664+ # Add field's own code version
665+ hasher .update (str (field_code_version ).encode ())
666+
667+ # Add dependent field versions
668+ for dep in field_deps :
669+ dep_feature = dep .get ("feature" ) or dep .get ("key" , [])
670+ if isinstance (dep_feature , list ):
671+ dep_feature_str = "/" .join (dep_feature )
672+ else :
673+ dep_feature_str = dep_feature
674+
675+ dep_fields = dep .get ("fields" , [])
676+ if not dep_fields :
677+ # If no specific fields, depend on all fields of the upstream feature
678+ if dep_feature_str in all_specs :
679+ dep_spec = all_specs [dep_feature_str ]["feature_spec" ]
680+ for dep_field_dict in dep_spec .get ("fields" , []):
681+ dep_field_key = dep_field_dict .get ("key" , [])
682+ if isinstance (dep_field_key , list ):
683+ dep_field_str = "/" .join (dep_field_key )
684+ else :
685+ dep_field_str = dep_field_key
686+
687+ # Recursively compute dependent field version
688+ dep_field_version = self ._compute_field_version_from_spec (
689+ dep_feature_str ,
690+ dep_field_str ,
691+ dep_field_dict .get ("code_version" , "__metaxy_initial__" ),
692+ dep_field_dict .get ("deps" , []),
693+ all_specs ,
694+ computed_versions ,
695+ )
696+ hasher .update (dep_field_version .encode ())
697+ else :
698+ # Specific field dependencies
699+ for dep_field in dep_fields :
700+ if isinstance (dep_field , list ):
701+ dep_field_str = "/" .join (dep_field )
702+ else :
703+ dep_field_str = dep_field
704+
705+ # Find the field spec in the upstream feature
706+ if dep_feature_str in all_specs :
707+ dep_spec = all_specs [dep_feature_str ]["feature_spec" ]
708+ for dep_field_dict in dep_spec .get ("fields" , []):
709+ field_key_from_spec = dep_field_dict .get ("key" , [])
710+ if isinstance (field_key_from_spec , list ):
711+ field_str_from_spec = "/" .join (field_key_from_spec )
712+ else :
713+ field_str_from_spec = field_key_from_spec
714+
715+ if field_str_from_spec == dep_field_str :
716+ # Found the field, compute its version
717+ dep_field_version = (
718+ self ._compute_field_version_from_spec (
719+ dep_feature_str ,
720+ dep_field_str ,
721+ dep_field_dict .get (
722+ "code_version" , "__metaxy_initial__"
723+ ),
724+ dep_field_dict .get ("deps" , []),
725+ all_specs ,
726+ computed_versions ,
727+ )
728+ )
729+ hasher .update (dep_field_version .encode ())
730+ break
731+
732+ # Compute and cache the version
733+ from metaxy .utils .hashing import truncate_hash
734+
735+ version = truncate_hash (hasher .hexdigest ())
736+ computed_versions [cache_key ] = version
737+ return version
738+
628739 def load_snapshot_data (
629740 self , store : MetadataStore , snapshot_version : str , project : str | None = None
630741 ) -> Mapping [str , Mapping [str , Any ]]:
@@ -696,66 +807,34 @@ def load_snapshot_data(
696807 ), # Fallback for backward compatibility
697808 }
698809
699- # Try to reconstruct FeatureGraph from snapshot to compute field versions
700- # This may fail if features have been removed/moved, so we handle that gracefully
701- graph : FeatureGraph | None = None
702- try :
703- graph = FeatureGraph .from_snapshot (snapshot_dict )
704- graph_available = True
705- except ImportError :
706- # Some features can't be imported (likely removed) - proceed without graph
707- # For diff purposes, we can still show feature-level changes
708- # We'll use feature_version as a fallback for all field versions
709- graph_available = False
710- warnings .warn (
711- "Using feature_version as field_version fallback for features that cannot be imported. "
712- "This may occur when features have been removed or moved." ,
713- UserWarning ,
714- stacklevel = 2 ,
715- )
716-
717- # Compute field versions using the reconstructed graph (if available)
718- from metaxy .models .plan import FQFieldKey
719-
810+ # Compute field versions directly from snapshot specs without loading Feature classes
811+ # This ensures we use the actual snapshot's field specs, not the current code
720812 snapshot_data = {}
813+ computed_versions : dict [tuple [str , str ], str ] = {}
814+
721815 for feature_key_str in snapshot_dict .keys ():
722816 feature_version = snapshot_dict [feature_key_str ]["metaxy_feature_version" ]
723817 feature_spec = snapshot_dict [feature_key_str ]["feature_spec" ]
724- feature_key_obj = FeatureKey (feature_key_str .split ("/" ))
725818
726- # Compute field versions using graph (if available)
819+ # Compute field versions from the snapshot's specs
727820 fields_data = {}
728- if (
729- graph_available
730- and graph is not None
731- and feature_key_obj in graph .features_by_key
732- ):
733- # Feature exists in reconstructed graph - compute precise field versions
734- for field_dict in feature_spec .get ("fields" , []):
735- field_key_list = field_dict .get ("key" )
736- if isinstance (field_key_list , list ):
737- field_key = FieldKey (field_key_list )
738- field_key_str_normalized = "/" .join (field_key_list )
739- else :
740- field_key = FieldKey ([field_key_list ])
741- field_key_str_normalized = field_key_list
742-
743- # Compute field version using the graph
744- fq_key = FQFieldKey (feature = feature_key_obj , field = field_key )
745- field_version = graph .get_field_version (fq_key )
746- fields_data [field_key_str_normalized ] = field_version
747- else :
748- # Feature doesn't exist in graph (removed/moved) - use feature_version as fallback
749- # All fields get the same version (the feature version)
750- for field_dict in feature_spec .get ("fields" , []):
751- field_key_list = field_dict .get ("key" )
752- if isinstance (field_key_list , list ):
753- field_key_str_normalized = "/" .join (field_key_list )
754- else :
755- field_key_str_normalized = field_key_list
756-
757- # Use feature_version directly as fallback
758- fields_data [field_key_str_normalized ] = feature_version
821+ for field_dict in feature_spec .get ("fields" , []):
822+ field_key_list = field_dict .get ("key" )
823+ if isinstance (field_key_list , list ):
824+ field_key_str_normalized = "/" .join (field_key_list )
825+ else :
826+ field_key_str_normalized = field_key_list
827+
828+ # Compute field version using the snapshot's specs
829+ field_version = self ._compute_field_version_from_spec (
830+ feature_key_str ,
831+ field_key_str_normalized ,
832+ field_dict .get ("code_version" , "__metaxy_initial__" ),
833+ field_dict .get ("deps" , []),
834+ snapshot_dict ,
835+ computed_versions ,
836+ )
837+ fields_data [field_key_str_normalized ] = field_version
759838
760839 snapshot_data [feature_key_str ] = {
761840 "metaxy_feature_version" : feature_version ,
0 commit comments