Skip to content

Commit 795225d

Browse files
[SDP] Create a new dataflow graph
1 parent db1d708 commit 795225d

File tree

5 files changed

+93
-17
lines changed

5 files changed

+93
-17
lines changed

docs/declarative-pipelines/PipelinesHandler.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ handlePipelinesCommand(
2424

2525
| PipelineCommand | Description | Initiator |
2626
|-----------------|-------------|-----------|
27-
| `CREATE_DATAFLOW_GRAPH` | [Creates a new dataflow graph](#CREATE_DATAFLOW_GRAPH) | [pyspark.pipelines.spark_connect_pipeline](#create_dataflow_graph) |
27+
| `CREATE_DATAFLOW_GRAPH` | [Creates a new dataflow graph](#CREATE_DATAFLOW_GRAPH) | [pyspark.pipelines.spark_connect_pipeline](spark_connect_pipeline.md#create_dataflow_graph) |
2828
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) ||
2929
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_dataset) |
3030
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_flow) |
31-
| `START_RUN` | [Starts a pipeline run](#START_RUN) | [pyspark.pipelines.spark_connect_pipeline.start_run](#start_run) |
31+
| `START_RUN` | [Starts a pipeline run](#START_RUN) | [pyspark.pipelines.spark_connect_pipeline.start_run](spark_connect_pipeline.md#start_run) |
3232
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_sql) |
3333

3434
`handlePipelinesCommand` reports an `UnsupportedOperationException` for incorrect commands:
@@ -43,7 +43,7 @@ handlePipelinesCommand(
4343

4444
* `SparkConnectPlanner` ([Spark Connect]({{ book.spark_connect }}/server/SparkConnectPlanner)) is requested to `handlePipelineCommand` (for `PIPELINE_COMMAND` command)
4545

46-
### CREATE_DATAFLOW_GRAPH { #CREATE_DATAFLOW_GRAPH }
46+
### <span id="CreateDataflowGraph"> CREATE_DATAFLOW_GRAPH { #CREATE_DATAFLOW_GRAPH }
4747

4848
[handlePipelinesCommand](#handlePipelinesCommand) creates a [dataflow graph](#createDataflowGraph) and sends the graph ID back.
4949

@@ -113,9 +113,19 @@ createDataflowGraph(
113113
spark: SparkSession): String
114114
```
115115

116-
`createDataflowGraph` finds the catalog and the database in the given `cmd` command and [creates a dataflow graph](DataflowGraphRegistry.md#createDataflowGraph).
116+
`createDataflowGraph` gets the catalog (from the given `CreateDataflowGraph` if defined in the [pipeline specification file](index.md#pipeline-specification-file)) or prints out the following INFO message to the logs and uses the current catalog instead.
117117

118-
`createDataflowGraph` returns the ID of the created dataflow graph.
118+
```text
119+
No default catalog was supplied. Falling back to the current catalog: [currentCatalog].
120+
```
121+
122+
`createDataflowGraph` gets the database (from the given `CreateDataflowGraph` if defined in the [pipeline specification file](index.md#pipeline-specification-file)) or prints out the following INFO message to the logs and uses the current database instead.
123+
124+
```text
125+
No default database was supplied. Falling back to the current database: [currentDatabase].
126+
```
127+
128+
In the end, `createDataflowGraph` [creates a dataflow graph](DataflowGraphRegistry.md#createDataflowGraph) (in the session's [DataflowGraphRegistry](DataflowGraphRegistry.md)).
119129

120130
## defineSqlGraphElements { #defineSqlGraphElements }
121131

docs/declarative-pipelines/SparkPipelines.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ subtitle: Spark Pipelines CLI
99

1010
`SparkPipelines` is a Scala "launchpad" to execute [pyspark/pipelines/cli.py](#pyspark-pipelines-cli) Python script (through [SparkSubmit]({{ book.spark_core }}/tools/spark-submit/SparkSubmit/)).
1111

12-
## PySpark Pipelines CLI
12+
## cli.py { #pyspark-pipelines-cli }
1313

14-
`pyspark/pipelines/cli.py` is the Pipelines CLI that is launched using [spark-pipelines](./index.md#spark-pipelines) shell script.
14+
`pyspark/pipelines/cli.py` is the heart of the Spark Pipelines CLI (launched using [spark-pipelines](./index.md#spark-pipelines) shell script).
15+
16+
As a Python script, `cli.py` can simply import Python libraries (to trigger their execution) whereas SQL libraries are left untouched and sent over the wire to a Spark Connect server ([PipelinesHandler](PipelinesHandler.md)) for execution.
1517

1618
The Pipelines CLI supports the following commands:
1719

docs/declarative-pipelines/configuration-properties.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Configuration Properties
22

3-
**Configuration properties** (aka **settings**) for [Spark Declarative Pipelines](index.md).
3+
**Configuration properties** (aka **configs**) for [Spark Declarative Pipelines](index.md).
44

55
## <span id="PIPELINES_EVENT_QUEUE_CAPACITY"> event.queue.capacity { #spark.sql.pipelines.event.queue.capacity }
66

docs/declarative-pipelines/index.md

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,35 @@ Once described, a pipeline can be [started](PipelineExecution.md#runPipeline) (o
4646

4747
The heart of a Declarative Pipelines project is a **pipeline specification file** (in YAML format).
4848

49-
In the pipeline specification file, Declarative Pipelines developers specify files (`libraries`) with tables, views and flows (transformations) definitions in Python and SQL. A SDP project can use both languages simultaneously.
49+
In the pipeline specification file, Declarative Pipelines developers specify files (`libraries`) with tables, views and flows (transformations) definitions in [Python](#python) and [SQL](#sql). A SDP project can use both languages simultaneously.
5050

5151
The following fields are supported:
5252

5353
Field Name | Description
5454
-|-
55-
`name` (required) | |
56-
`catalog` | |
57-
`database` | |
58-
`schema` | Alias of `database`. Used unless `database` is defined |
59-
`configuration` | |
60-
`libraries` | `glob`s of `include`s with SQL and Python transformations |
55+
`name` (required) | &nbsp;
56+
`catalog` | The default catalog to register datasets into.<br>Unless specified, [PipelinesHandler](PipelinesHandler.md#createDataflowGraph) falls back to the current catalog.
57+
`database` | The default database to register datasets into<br>Unless specified, [PipelinesHandler](PipelinesHandler.md#createDataflowGraph) falls back to the current database.
58+
`schema` | Alias of `database`. Used unless `database` is defined
59+
`storage` | ⚠️ does not seem to be used
60+
`configuration` | SparkSession configs<br>Spark Pipelines runtime uses the configs to build a new `SparkSession` when `run`.<br>[spark.sql.connect.serverStacktrace.enabled]({{ book.spark_connect }}/configuration-properties/#spark.sql.connect.serverStacktrace.enabled) is hardcoded to be always `false`.
61+
`libraries` | `glob`s of `include`s with transformations in [SQL](#sql) and [Python](#python-decorators)
6162

6263
```yaml
6364
name: hello-spark-pipelines
6465
catalog: default_catalog
6566
schema: default
67+
storage: storage-root
6668
configuration:
6769
spark.key1: value1
6870
libraries:
6971
- glob:
7072
include: transformations/**
7173
```
7274
73-
## spark-pipelines Shell Script { #spark-pipelines }
75+
## Spark Pipelines CLI { #spark-pipelines }
7476
75-
`spark-pipelines` shell script is used to launch [org.apache.spark.deploy.SparkPipelines](SparkPipelines.md).
77+
`spark-pipelines` shell script is the **Spark Pipelines CLI** (that launches [org.apache.spark.deploy.SparkPipelines](SparkPipelines.md) behind the scenes).
7678

7779
## Dataset Types
7880

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
---
2+
title: spark_connect_pipeline
3+
---
4+
5+
# spark_connect_pipeline PySpark Module
6+
7+
## create_dataflow_graph { #create_dataflow_graph }
8+
9+
```py
10+
create_dataflow_graph(
11+
spark: SparkSession,
12+
default_catalog: Optional[str],
13+
default_database: Optional[str],
14+
sql_conf: Optional[Mapping[str, str]],
15+
) -> str
16+
```
17+
18+
`create_dataflow_graph`...FIXME
19+
20+
---
21+
22+
`create_dataflow_graph` is used when:
23+
24+
* FIXME
25+
26+
## start_run { #start_run }
27+
28+
```py
29+
start_run(
30+
spark: SparkSession,
31+
dataflow_graph_id: str,
32+
full_refresh: Optional[Sequence[str]],
33+
full_refresh_all: bool,
34+
refresh: Optional[Sequence[str]],
35+
dry: bool,
36+
storage: str,
37+
) -> Iterator[Dict[str, Any]]
38+
```
39+
40+
`start_run`...FIXME
41+
42+
---
43+
44+
`start_run` is used when:
45+
46+
* FIXME
47+
48+
## handle_pipeline_events { #handle_pipeline_events }
49+
50+
```py
51+
handle_pipeline_events(
52+
iter: Iterator[Dict[str, Any]]
53+
) -> None
54+
```
55+
56+
`handle_pipeline_events`...FIXME
57+
58+
---
59+
60+
`handle_pipeline_events` is used when:
61+
62+
* FIXME

0 commit comments

Comments
 (0)