Skip to content

Commit 260a1c4

Browse files
authored
feat(isthmus): support limited processing of DDL statements(#432)
1 parent abdd39a commit 260a1c4

13 files changed

+613
-20
lines changed
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package io.substrait.isthmus;
2+
3+
import io.substrait.relation.Aggregate;
4+
import io.substrait.relation.ConsistentPartitionWindow;
5+
import io.substrait.relation.Cross;
6+
import io.substrait.relation.EmptyScan;
7+
import io.substrait.relation.Expand;
8+
import io.substrait.relation.ExtensionDdl;
9+
import io.substrait.relation.ExtensionLeaf;
10+
import io.substrait.relation.ExtensionMulti;
11+
import io.substrait.relation.ExtensionSingle;
12+
import io.substrait.relation.ExtensionTable;
13+
import io.substrait.relation.ExtensionWrite;
14+
import io.substrait.relation.Fetch;
15+
import io.substrait.relation.Filter;
16+
import io.substrait.relation.Join;
17+
import io.substrait.relation.LocalFiles;
18+
import io.substrait.relation.NamedDdl;
19+
import io.substrait.relation.NamedScan;
20+
import io.substrait.relation.NamedUpdate;
21+
import io.substrait.relation.NamedWrite;
22+
import io.substrait.relation.Project;
23+
import io.substrait.relation.RelVisitor;
24+
import io.substrait.relation.Set;
25+
import io.substrait.relation.Sort;
26+
import io.substrait.relation.VirtualTableScan;
27+
import io.substrait.relation.physical.HashJoin;
28+
import io.substrait.relation.physical.MergeJoin;
29+
import io.substrait.relation.physical.NestedLoopJoin;
30+
import io.substrait.util.EmptyVisitationContext;
31+
import org.apache.calcite.sql.SqlKind;
32+
33+
/**
34+
* A visitor to infer the general SqlKind from the root of a Substrait Rel tree. Note: This infers
35+
* the general operation type, as the original SQL syntax is not preserved in the Substrait plan.
36+
*/
37+
public class SqlKindFromRel
38+
implements RelVisitor<SqlKind, EmptyVisitationContext, RuntimeException> {
39+
40+
// Most common query operations map to SELECT.
41+
private static final SqlKind QUERY_KIND = SqlKind.SELECT;
42+
43+
@Override
44+
public SqlKind visit(Aggregate aggregate, EmptyVisitationContext context)
45+
throws RuntimeException {
46+
47+
return QUERY_KIND;
48+
}
49+
50+
@Override
51+
public SqlKind visit(EmptyScan emptyScan, EmptyVisitationContext context)
52+
throws RuntimeException {
53+
// An empty scan is typically the result of a query that returns no rows.
54+
return QUERY_KIND;
55+
}
56+
57+
@Override
58+
public SqlKind visit(Fetch fetch, EmptyVisitationContext context) throws RuntimeException {
59+
return QUERY_KIND;
60+
}
61+
62+
@Override
63+
public SqlKind visit(Filter filter, EmptyVisitationContext context) throws RuntimeException {
64+
return QUERY_KIND;
65+
}
66+
67+
@Override
68+
public SqlKind visit(Join join, EmptyVisitationContext context) throws RuntimeException {
69+
return SqlKind.JOIN;
70+
}
71+
72+
@Override
73+
public SqlKind visit(Set set, EmptyVisitationContext context) throws RuntimeException {
74+
switch (set.getSetOp()) {
75+
case UNION_ALL:
76+
case UNION_DISTINCT:
77+
return SqlKind.UNION;
78+
case INTERSECTION_PRIMARY:
79+
case INTERSECTION_MULTISET:
80+
case INTERSECTION_MULTISET_ALL:
81+
return SqlKind.INTERSECT;
82+
case MINUS_PRIMARY:
83+
case MINUS_PRIMARY_ALL:
84+
case MINUS_MULTISET:
85+
return SqlKind.EXCEPT;
86+
case UNKNOWN:
87+
default:
88+
return SqlKind.OTHER;
89+
}
90+
}
91+
92+
@Override
93+
public SqlKind visit(NamedScan namedScan, EmptyVisitationContext context)
94+
throws RuntimeException {
95+
return QUERY_KIND;
96+
}
97+
98+
@Override
99+
public SqlKind visit(LocalFiles localFiles, EmptyVisitationContext context)
100+
throws RuntimeException {
101+
return QUERY_KIND;
102+
}
103+
104+
@Override
105+
public SqlKind visit(Project project, EmptyVisitationContext context) throws RuntimeException {
106+
return QUERY_KIND;
107+
}
108+
109+
@Override
110+
public SqlKind visit(Expand expand, EmptyVisitationContext context) throws RuntimeException {
111+
return QUERY_KIND;
112+
}
113+
114+
@Override
115+
public SqlKind visit(Sort sort, EmptyVisitationContext context) throws RuntimeException {
116+
return SqlKind.ORDER_BY;
117+
}
118+
119+
@Override
120+
public SqlKind visit(Cross cross, EmptyVisitationContext context) throws RuntimeException {
121+
return SqlKind.JOIN;
122+
}
123+
124+
@Override
125+
public SqlKind visit(VirtualTableScan virtualTableScan, EmptyVisitationContext context)
126+
throws RuntimeException {
127+
// A virtual table scan corresponds to a VALUES clause.
128+
return SqlKind.VALUES;
129+
}
130+
131+
@Override
132+
public SqlKind visit(ExtensionLeaf extensionLeaf, EmptyVisitationContext context)
133+
throws RuntimeException {
134+
return SqlKind.OTHER;
135+
}
136+
137+
@Override
138+
public SqlKind visit(ExtensionSingle extensionSingle, EmptyVisitationContext context)
139+
throws RuntimeException {
140+
return SqlKind.OTHER;
141+
}
142+
143+
@Override
144+
public SqlKind visit(ExtensionMulti extensionMulti, EmptyVisitationContext context)
145+
throws RuntimeException {
146+
return SqlKind.OTHER;
147+
}
148+
149+
@Override
150+
public SqlKind visit(ExtensionTable extensionTable, EmptyVisitationContext context)
151+
throws RuntimeException {
152+
return SqlKind.OTHER;
153+
}
154+
155+
@Override
156+
public SqlKind visit(HashJoin hashJoin, EmptyVisitationContext context) throws RuntimeException {
157+
return SqlKind.JOIN;
158+
}
159+
160+
@Override
161+
public SqlKind visit(MergeJoin mergeJoin, EmptyVisitationContext context)
162+
throws RuntimeException {
163+
return SqlKind.JOIN;
164+
}
165+
166+
@Override
167+
public SqlKind visit(NestedLoopJoin nestedLoopJoin, EmptyVisitationContext context)
168+
throws RuntimeException {
169+
return SqlKind.JOIN;
170+
}
171+
172+
@Override
173+
public SqlKind visit(
174+
ConsistentPartitionWindow consistentPartitionWindow, EmptyVisitationContext context)
175+
throws RuntimeException {
176+
return SqlKind.OVER;
177+
}
178+
179+
@Override
180+
public SqlKind visit(NamedWrite write, EmptyVisitationContext context) throws RuntimeException {
181+
switch (write.getOperation()) {
182+
case INSERT:
183+
return SqlKind.INSERT;
184+
case DELETE:
185+
return SqlKind.DELETE;
186+
case UPDATE:
187+
return SqlKind.UPDATE;
188+
case CTAS:
189+
return SqlKind.CREATE_TABLE;
190+
default:
191+
return SqlKind.OTHER;
192+
}
193+
}
194+
195+
@Override
196+
public SqlKind visit(ExtensionWrite write, EmptyVisitationContext context)
197+
throws RuntimeException {
198+
return SqlKind.OTHER_DDL;
199+
}
200+
201+
@Override
202+
public SqlKind visit(NamedDdl ddl, EmptyVisitationContext context) throws RuntimeException {
203+
switch (ddl.getOperation()) {
204+
case CREATE:
205+
case CREATE_OR_REPLACE:
206+
if (ddl.getObject() == NamedDdl.DdlObject.TABLE) {
207+
return SqlKind.CREATE_TABLE;
208+
} else if (ddl.getObject() == NamedDdl.DdlObject.VIEW) {
209+
return SqlKind.CREATE_VIEW;
210+
}
211+
break;
212+
case DROP:
213+
case DROP_IF_EXIST:
214+
if (ddl.getObject() == NamedDdl.DdlObject.TABLE) {
215+
return SqlKind.DROP_TABLE;
216+
} else if (ddl.getObject() == NamedDdl.DdlObject.VIEW) {
217+
return SqlKind.DROP_VIEW;
218+
}
219+
break;
220+
case ALTER:
221+
if (ddl.getObject() == NamedDdl.DdlObject.TABLE) {
222+
return SqlKind.ALTER_TABLE;
223+
} else if (ddl.getObject() == NamedDdl.DdlObject.VIEW) {
224+
return SqlKind.ALTER_VIEW;
225+
}
226+
break;
227+
}
228+
return SqlKind.OTHER_DDL;
229+
}
230+
231+
@Override
232+
public SqlKind visit(ExtensionDdl ddl, EmptyVisitationContext context) throws RuntimeException {
233+
return SqlKind.OTHER_DDL;
234+
}
235+
236+
@Override
237+
public SqlKind visit(NamedUpdate update, EmptyVisitationContext context) throws RuntimeException {
238+
return SqlKind.UPDATE;
239+
}
240+
}

isthmus/src/main/java/io/substrait/isthmus/SubstraitRelNodeConverter.java

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@
1010
import io.substrait.expression.Expression.SortDirection;
1111
import io.substrait.expression.FunctionArg;
1212
import io.substrait.extension.SimpleExtension;
13+
import io.substrait.isthmus.calcite.rel.CreateTable;
14+
import io.substrait.isthmus.calcite.rel.CreateView;
1315
import io.substrait.isthmus.expression.AggregateFunctionConverter;
1416
import io.substrait.isthmus.expression.ExpressionRexConverter;
1517
import io.substrait.isthmus.expression.ScalarFunctionConverter;
1618
import io.substrait.isthmus.expression.WindowFunctionConverter;
19+
import io.substrait.relation.AbstractDdlRel;
1720
import io.substrait.relation.AbstractRelVisitor;
1821
import io.substrait.relation.AbstractUpdate;
22+
import io.substrait.relation.AbstractWriteRel;
1923
import io.substrait.relation.Aggregate;
2024
import io.substrait.relation.Cross;
2125
import io.substrait.relation.EmptyScan;
@@ -24,6 +28,7 @@
2428
import io.substrait.relation.Join;
2529
import io.substrait.relation.Join.JoinType;
2630
import io.substrait.relation.LocalFiles;
31+
import io.substrait.relation.NamedDdl;
2732
import io.substrait.relation.NamedScan;
2833
import io.substrait.relation.NamedUpdate;
2934
import io.substrait.relation.NamedWrite;
@@ -548,8 +553,29 @@ public RelNode visit(NamedUpdate update, Context context) {
548553
}
549554

550555
@Override
551-
public RelNode visit(VirtualTableScan virtualTableScan, Context context) {
556+
public RelNode visit(NamedDdl namedDdl, Context context) {
557+
if (namedDdl.getOperation() != AbstractDdlRel.DdlOp.CREATE
558+
|| namedDdl.getObject() != AbstractDdlRel.DdlObject.VIEW) {
559+
throw new UnsupportedOperationException(
560+
String.format(
561+
"Can only handle NamedDdl with (%s, %s), given (%s, %s)",
562+
AbstractDdlRel.DdlOp.CREATE,
563+
AbstractDdlRel.DdlObject.VIEW,
564+
namedDdl.getOperation(),
565+
namedDdl.getObject()));
566+
}
567+
568+
if (namedDdl.getViewDefinition().isEmpty()) {
569+
throw new IllegalArgumentException("NamedDdl view definition must be set");
570+
}
552571

572+
Rel viewDefinition = namedDdl.getViewDefinition().get();
573+
RelNode relNode = viewDefinition.accept(this, context);
574+
return new CreateView(namedDdl.getNames(), relNode);
575+
}
576+
577+
@Override
578+
public RelNode visit(VirtualTableScan virtualTableScan, Context context) {
553579
final RelDataType typeInfoOnly =
554580
typeConverter.toCalcite(typeFactory, virtualTableScan.getInitialSchema().struct());
555581

@@ -584,15 +610,29 @@ public RelNode visit(VirtualTableScan virtualTableScan, Context context) {
584610
relBuilder.getCluster(), rowTypeWithNames, ImmutableList.copyOf(tuples));
585611
}
586612

613+
private RelNode handleCreateTableAs(NamedWrite namedWrite, Context context) {
614+
if (namedWrite.getCreateMode() != AbstractWriteRel.CreateMode.REPLACE_IF_EXISTS
615+
|| namedWrite.getOutputMode() != AbstractWriteRel.OutputMode.NO_OUTPUT) {
616+
throw new UnsupportedOperationException(
617+
String.format(
618+
"Can only handle CTAS NamedWrite with (%s, %s), given (%s, %s)",
619+
AbstractWriteRel.CreateMode.REPLACE_IF_EXISTS,
620+
AbstractWriteRel.OutputMode.NO_OUTPUT,
621+
namedWrite.getCreateMode(),
622+
namedWrite.getOutputMode()));
623+
}
624+
625+
Rel input = namedWrite.getInput();
626+
RelNode relNode = input.accept(this, context);
627+
return new CreateTable(namedWrite.getNames(), relNode);
628+
}
629+
587630
@Override
588631
public RelNode visit(NamedWrite write, Context context) {
589632
RelNode input = write.getInput().accept(this, context);
590633
assert relBuilder.getRelOptSchema() != null;
591-
final RelOptTable table = relBuilder.getRelOptSchema().getTableForMember(write.getNames());
592-
593-
if (table == null) {
594-
throw new IllegalStateException("Table not found in Calcite catalog: " + write.getNames());
595-
}
634+
final RelOptTable targetTable =
635+
relBuilder.getRelOptSchema().getTableForMember(write.getNames());
596636

597637
TableModify.Operation operation;
598638
switch (write.getOperation()) {
@@ -602,16 +642,20 @@ public RelNode visit(NamedWrite write, Context context) {
602642
case DELETE:
603643
operation = TableModify.Operation.DELETE;
604644
break;
645+
case CTAS:
646+
return handleCreateTableAs(write, context);
605647
default:
606648
throw new UnsupportedOperationException(
607-
"Write operation '"
608-
+ write.getOperation()
609-
+ "' is not supported by the NamedWrite visitor. "
610-
+ "Check if a more specific relation type (e.g., NamedUpdate) should be used.");
649+
String.format(
650+
"NamedWrite with WriteOp %s cannot be converted to a Calcite RelNode. Consider using a more specific Rel (e.g NamedUpdate)",
651+
write.getOperation()));
611652
}
612653

654+
// checked by validation
655+
assert targetTable != null;
656+
613657
return LogicalTableModify.create(
614-
table,
658+
targetTable,
615659
(Prepare.CatalogReader) relBuilder.getRelOptSchema(),
616660
input,
617661
operation,

0 commit comments

Comments
 (0)