-
Notifications
You must be signed in to change notification settings - Fork 921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sparkc 577 round three #1264
base: b3.0
Are you sure you want to change the base?
Sparkc 577 round three #1264
Conversation
…ft involves fixing args passed to transitive dep classes
|
||
sealed trait ColumnSelector { | ||
def aliases: Map[String, String] | ||
def selectFrom(table: TableDef): IndexedSeq[ColumnRef] | ||
def selectFrom(table: TableMetadata): IndexedSeq[ColumnRef] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instances of this trait are used widely throughout the code base so in order to avoid broad breakage here I just overloaded selectFrom()
|
||
private def missingColumns(table:TableMetadata, columnsToCheck: Seq[ColumnRef]): Seq[ColumnRef] = | ||
for (c <- columnsToCheck if !TableDef.containsColumn(c.columnName)(table)) yield c | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formerly in StructDef. Pulled out because it's only really used here so it seemed better to localize the functionality where it's needed. If this becomes more broadly used it can always be moved back to StructDef
readConf: ReadConf, | ||
schema: StructType, | ||
cqlQueryParts: ScanHelper.CqlQueryParts, | ||
partition: InputPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntelliJ kept insisting on reformatting arg lists in this way and nothing I did could convince it to stop. This is a relatively minor thing that can be cleaned up later but my apologies for the noise while reviewing.
|
||
sealed trait ClusteringOrder extends Serializable { | ||
private[connector] def toCql(tableDef: TableDef): String | ||
private[connector] def toCql(table: TableMetadata): String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another case where overloading seemed to make sense, at least for now
The goal of this PR is to demonstrate what the proposed new solution would look like in code and to get general agreement that this is what we want to do to resolve SPARKC-577. Since the goal is demonstration and discussion this is hardly a finished product; there's plenty of re-organization and optimization that could take place here. One example: keeping the impls of functions from TableDef/ColumnDef/etc. for Java driver metadata types in companion objects may not be ideal. This starts to look very much like a type class, which it isn't, and that might be confusing. [1] I could see moving these functions to a utility class somewhere perhaps. Another example: I didn't fully leverage implicits and/or imports of object fns as broadly as possible. Partly this was done in an attempt to get something up as quickly as possible for discussion. It was also done because it wasn't immediately clear to me that this: private[connector] def querySelectUsingOnlyPartitionKeys(table: TableMetadata): String = {
implicit val tableMetadata = table
def quotedColumnNames(columns: Seq[ColumnMetadata]) = partitionKey.map(columnName(_)).map(quote)
val whereClause = quotedColumnNames(partitionKey).map(c => s"$c = :$c").mkString(" AND ")
s"SELECT * FROM ${quote(keyspaceName)}.${quote(tableName}) WHERE $whereClause"
} is better than this: private[connector] def querySelectUsingOnlyPartitionKeys(table: TableMetadata): String = {
val partitionKeys = TableDef.partitionKey(table)
def quotedColumnNames(columns: Seq[ColumnMetadata]) = partitionKeys.map(ColumnDef.columnName(_)).map(quote)
val whereClause = quotedColumnNames(partitionKeys).map(c => s"$c = :$c").mkString(" AND ")
s"SELECT * FROM ${quote(TableDef.keyspaceName(table))}.${quote(TableDef.tableName(table))} WHERE $whereClause"
} It might be, but that wasn't something I needed to settle in order to satisfy the goals of this PR [1] A type class would be an interesting way to solve the problem of SPARKC-577 but it quickly starts to look quite a bit like the second solution (discrete *Def types for current + Java driver types). That said, the difference between such an approach and this one is not.. huge. And it seems to me that a solution based on type classes would be subject to the same objections raised to the second PR. |
Description
How did the Spark Cassandra Connector Work or Not Work Before this Patch
Connector was using internal serializable reps for keyspace/table metadata because corresponding Java driver classes weren't serializable. This changed in v4.6.0.
General Design of the patch
Case classes TableDef, ColumnDef currently serve two distinct roles:
Majority of operations leverage the first role above. This PR begins the process of converting these usages to the types provided by the Java driver now that they're serializable. The existing TableDef, ColumnDef etc. case classes are preserved as-is to support future table creation.
This PR reflects only the changes necessary to support conversion of one fairly high-level class (CassandraScanBuilder). It certainly isn't complete and the changes to various method signatures broke other things in the code which will also need to be fixed. The intent of this PR is to demonstrate the basic process in order to get agreement on this approach going forward.
Fixes: SPARKC-577
How Has This Been Tested?
Still a WIP, hasn't been tested meaningfully yet
Checklist: