Skip to content

DataFrame.cache() does not work in distributed environments #17297

@milenkovicm

Description

@milenkovicm

Is your feature request related to a problem or challenge?

DataFrame.cache() does not work in distributed environments as it will materialise whole table in memory.

pub async fn cache(self) -> Result<DataFrame> {
    let context = SessionContext::new_with_state((*self.session_state).clone());
    // The schema is consistent with the output
    let plan = self.clone().create_physical_plan().await?;
    let schema = plan.schema();
    let task_ctx = Arc::new(self.task_ctx());
    let partitions = collect_partitioned(plan, task_ctx).await?;
    let mem_table = MemTable::try_new(schema, partitions)?;
    context.read_table(Arc::new(mem_table))
}

This does not work in distributed environments such as ballista.

Also note, cache will eagerly materialise, even there is no downstream consumers (which may not be a big problem) but it does not follow semantics of spark.cache(..)

Describe the solution you'd like

  • Ideally cache should be represented with logical plan node, which would be resolved first time cache is needed. This approach may be a bit complicated to implement, and datafusion may not really benefit from it.

  • As an easier to implement alternative, we could provide a cache factory (at session_state maybe which would provide same functionality as it is currently if not overridden or provide user specified logic, such as returning LogicalPlan::Extension or similar, and leave query planner/user to deal with cache materialisation decision.

  • EDIT: Another alternative would be to have a configuration option, like datafusion.execution.local_cache which would decide should it use current cache logic or return new logical plan element like LogicalPlan::Cache { id: string, lineage: LogicalPlan }

pub async fn cache(self) -> Result<DataFrame> {
   if `datafusion.execution.local_cache` == true {
     let context = SessionContext::new_with_state((*self.session_state).clone());
     // The schema is consistent with the output
     let plan = self.clone().create_physical_plan().await?;
     let schema = plan.schema();
     let task_ctx = Arc::new(self.task_ctx());
     let partitions = collect_partitioned(plan, task_ctx).await?;
     let mem_table = MemTable::try_new(schema, partitions)?;
     context.read_table(Arc::new(mem_table))
  } else {
   let lineage = self... // get current logical plan from df up to this point
   Ok(
     LogicalPlan::Cache(Cache { id : uuid(), lineage)  
   )
  }
}

Describe alternatives you've considered

Users can create DataFrameExt which would provide a new method like distributed_cache which would return LogicalPlan::Extension with DistributedCacheExtension. This alternative is the simplest to implement (not affecting datafusion) but user would need to change code when moving from datafusion to ballista

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions