diff --git a/docs/integrations/data-ingestion/apache-spark/databricks.md b/docs/integrations/data-ingestion/apache-spark/databricks.md deleted file mode 100644 index 302d8a0e584..00000000000 --- a/docs/integrations/data-ingestion/apache-spark/databricks.md +++ /dev/null @@ -1,311 +0,0 @@ ---- -sidebar_label: 'Databricks' -sidebar_position: 3 -slug: /integrations/data-ingestion/apache-spark/databricks -description: 'Integrate ClickHouse with Databricks' -keywords: ['clickhouse', 'databricks', 'spark', 'unity catalog', 'data'] -title: 'Integrating ClickHouse with Databricks' -doc_type: 'guide' ---- - -import Image from '@theme/IdealImage'; -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; -import ClickHouseSupportedBadge from '@theme/badges/ClickHouseSupported'; - -# Integrating ClickHouse with Databricks - - - -The ClickHouse Spark connector works seamlessly with Databricks. This guide covers platform-specific setup, installation, and usage patterns for Databricks. - -## API Selection for Databricks {#api-selection} - -By default, Databricks uses Unity Catalog, which blocks Spark catalog registration. In this case, you **must** use the **TableProvider API** (format-based access). - -However, if you disable Unity Catalog by creating a cluster with **No isolation shared** access mode, you can use the **Catalog API** instead. The Catalog API provides centralized configuration and native Spark SQL integration. - -| Unity Catalog Status | Recommended API | Notes | -|---------------------|------------------|-------| -| **Enabled** (default) | TableProvider API (format-based) | Unity Catalog blocks Spark catalog registration | -| **Disabled** (No isolation shared) | Catalog API | Requires cluster with "No isolation shared" access mode | - -## Installation on Databricks {#installation} - -### Option 1: Upload JAR via Databricks UI {#installation-ui} - -1. Build or [download](https://repo1.maven.org/maven2/com/clickhouse/spark/) the runtime JAR: - ```bash - clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar - ``` - -2. Upload the JAR to your Databricks workspace: - - Go to **Workspace** → Navigate to your desired folder - - Click **Upload** → Select the JAR file - - The JAR will be stored in your workspace - -3. Install the library on your cluster: - - Go to **Compute** → Select your cluster - - Click the **Libraries** tab - - Click **Install New** - - Select **DBFS** or **Workspace** → Navigate to the uploaded JAR file - - Click **Install** - -Databricks Libraries tab - -Installing library from workspace volume - -4. Restart the cluster to load the library - -### Option 2: Install via Databricks CLI {#installation-cli} - -```bash -# Upload JAR to DBFS -databricks fs cp clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar \ - dbfs:/FileStore/jars/ - -# Install on cluster -databricks libraries install \ - --cluster-id \ - --jar dbfs:/FileStore/jars/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar -``` - -### Option 3: Maven Coordinates (Recommended) {#installation-maven} - -1. Navigate to your Databricks workspace: - - Go to **Compute** → Select your cluster - - Click the **Libraries** tab - - Click **Install New** - - Select **Maven** tab - -2. Add the Maven coordinates: - -```text -com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }} -``` - -Databricks Maven libraries configuration - -3. Click **Install** and restart the cluster to load the library - -## Using TableProvider API {#tableprovider-api} - -When Unity Catalog is enabled (default), you **must** use the TableProvider API (format-based access) because Unity Catalog blocks Spark catalog registration. If you've disabled Unity Catalog by using a cluster with "No isolation shared" access mode, you can use the [Catalog API](/docs/integrations/data-ingestion/apache-spark/spark-native-connector#register-the-catalog-required) instead. - -### Reading data {#reading-data-table-provider} - - - - -```python -# Read from ClickHouse using TableProvider API -df = spark.read \ - .format("clickhouse") \ - .option("host", "your-clickhouse-cloud-host.clickhouse.cloud") \ - .option("protocol", "https") \ - .option("http_port", "8443") \ - .option("database", "default") \ - .option("table", "events") \ - .option("user", "default") \ - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \ - .option("ssl", "true") \ - .load() - -# Schema is automatically inferred -df.display() -``` - - - - -```scala -val df = spark.read - .format("clickhouse") - .option("host", "your-clickhouse-cloud-host.clickhouse.cloud") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "events") - .option("user", "default") - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) - .option("ssl", "true") - .load() - -df.show() -``` - - - - -### Writing data {#writing-data-unity} - - - - -```python -# Write to ClickHouse - table will be created automatically if it doesn't exist -df.write \ - .format("clickhouse") \ - .option("host", "your-clickhouse-cloud-host.clickhouse.cloud") \ - .option("protocol", "https") \ - .option("http_port", "8443") \ - .option("database", "default") \ - .option("table", "events_copy") \ - .option("user", "default") \ - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \ - .option("ssl", "true") \ - .option("order_by", "id") \ # Required: specify ORDER BY when creating a new table - .option("settings.allow_nullable_key", "1") \ # Required for ClickHouse Cloud if ORDER BY has nullable columns - .mode("append") \ - .save() -``` - - - - -```scala -df.write - .format("clickhouse") - .option("host", "your-clickhouse-cloud-host.clickhouse.cloud") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "events_copy") - .option("user", "default") - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) - .option("ssl", "true") - .option("order_by", "id") // Required: specify ORDER BY when creating a new table - .option("settings.allow_nullable_key", "1") // Required for ClickHouse Cloud if ORDER BY has nullable columns - .mode("append") - .save() -``` - - - - -:::note -This example assumes preconfigured secret scopes in Databricks. For setup instructions, see the Databricks [Secret management documentation](https://docs.databricks.com/aws/en/security/secrets/). -::: - -## Databricks-specific considerations {#considerations} - -### Secret management {#secret-management} - -Use Databricks secret scopes to securely store ClickHouse credentials: - -```python -# Access secrets -password = dbutils.secrets.get(scope="clickhouse", key="password") -``` - -For setup instructions, see the Databricks [Secret management documentation](https://docs.databricks.com/aws/en/security/secrets/). - - - -### ClickHouse Cloud connection {#clickhouse-cloud} - -When connecting to ClickHouse Cloud from Databricks: - -1. Use **HTTPS protocol** (`protocol: https`, `http_port: 8443`) -2. Enable **SSL** (`ssl: true`) - -## Examples {#examples} - -### Complete workflow example {#workflow-example} - - - - -```python -from pyspark.sql import SparkSession -from pyspark.sql.functions import col - -# Initialize Spark with ClickHouse connector -spark = SparkSession.builder \ - .config("spark.jars.packages", "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.9.0") \ - .getOrCreate() - -# Read from ClickHouse -df = spark.read \ - .format("clickhouse") \ - .option("host", "your-host.clickhouse.cloud") \ - .option("protocol", "https") \ - .option("http_port", "8443") \ - .option("database", "default") \ - .option("table", "source_table") \ - .option("user", "default") \ - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \ - .option("ssl", "true") \ - .load() - -# Transform data -transformed_df = df.filter(col("status") == "active") - -# Write to ClickHouse -transformed_df.write \ - .format("clickhouse") \ - .option("host", "your-host.clickhouse.cloud") \ - .option("protocol", "https") \ - .option("http_port", "8443") \ - .option("database", "default") \ - .option("table", "target_table") \ - .option("user", "default") \ - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \ - .option("ssl", "true") \ - .option("order_by", "id") \ - .mode("append") \ - .save() -``` - - - - -```scala -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.col - -// Initialize Spark with ClickHouse connector -val spark = SparkSession.builder - .config("spark.jars.packages", "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.9.0") - .getOrCreate() - -// Read from ClickHouse -val df = spark.read - .format("clickhouse") - .option("host", "your-host.clickhouse.cloud") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "source_table") - .option("user", "default") - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) - .option("ssl", "true") - .load() - -// Transform data -val transformedDF = df.filter(col("status") === "active") - -// Write to ClickHouse -transformedDF.write - .format("clickhouse") - .option("host", "your-host.clickhouse.cloud") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "target_table") - .option("user", "default") - .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) - .option("ssl", "true") - .option("order_by", "id") - .mode("append") - .save() -``` - - - - -## Related documentation {#related} - -- [Spark Native Connector Guide](/docs/integrations/data-ingestion/apache-spark/spark-native-connector) - Complete connector documentation -- [TableProvider API Documentation](/docs/integrations/data-ingestion/apache-spark/spark-native-connector#using-the-tableprovider-api-format-based-access) - Format-based access details -- [Catalog API Documentation](/docs/integrations/data-ingestion/apache-spark/spark-native-connector#register-the-catalog-required) - Catalog-based access details diff --git a/docs/integrations/data-ingestion/apache-spark/spark-native-connector.md b/docs/integrations/data-ingestion/apache-spark/spark-native-connector.md index 3dde9eabbdc..0e14bd7e744 100644 --- a/docs/integrations/data-ingestion/apache-spark/spark-native-connector.md +++ b/docs/integrations/data-ingestion/apache-spark/spark-native-connector.md @@ -14,12 +14,9 @@ integration: import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import TOCInline from '@theme/TOCInline'; -import ClickHouseSupportedBadge from '@theme/badges/ClickHouseSupported'; # Spark connector - - This connector leverages ClickHouse-specific optimizations, such as advanced partitioning and predicate pushdown, to improve query performance and data handling. The connector is based on [ClickHouse's official JDBC connector](https://github.com/ClickHouse/clickhouse-java), and @@ -34,20 +31,6 @@ catalog plugins. Spark's default catalog is `spark_catalog`, and tables are identified by `{catalog name}.{database}.{table}`. With the new catalog feature, it is now possible to add and work with multiple catalogs in a single Spark application. -## Choosing Between Catalog API and TableProvider API {#choosing-between-apis} - -The ClickHouse Spark connector supports two access patterns: the **Catalog API** and the **TableProvider API** (format-based access). Understanding the differences helps you choose the right approach for your use case. - -### Catalog API vs TableProvider API {#catalog-vs-tableprovider-comparison} - -| Feature | Catalog API | TableProvider API | -|---------|-------------|-------------------| -| **Configuration** | Centralized via Spark configuration | Per-operation via options | -| **Table Discovery** | Automatic via catalog | Manual table specification | -| **DDL Operations** | Full support (CREATE, DROP, ALTER) | Limited (automatic table creation only) | -| **Spark SQL Integration** | Native (`clickhouse.database.table`) | Requires format specification | -| **Use Case** | Long-term, stable connections with centralized config | Ad-hoc, dynamic, or temporary access | - ## Requirements {#requirements} @@ -243,419 +226,6 @@ That way, you would be able to access clickhouse1 table `.` fro ::: -## Using the TableProvider API (Format-based Access) {#using-the-tableprovider-api} - -In addition to the catalog-based approach, the ClickHouse Spark connector supports a **format-based access pattern** via the TableProvider API. - -### Format-based Read Example {#format-based-read} - - - - -```python -from pyspark.sql import SparkSession - -spark = SparkSession.builder.getOrCreate() - -# Read from ClickHouse using format API -df = spark.read \ - .format("clickhouse") \ - .option("host", "your-clickhouse-host") \ - .option("protocol", "https") \ - .option("http_port", "8443") \ - .option("database", "default") \ - .option("table", "your_table") \ - .option("user", "default") \ - .option("password", "your_password") \ - .option("ssl", "true") \ - .load() - -df.show() -``` - - - - -```scala -val df = spark.read - .format("clickhouse") - .option("host", "your-clickhouse-host") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "your_table") - .option("user", "default") - .option("password", "your_password") - .option("ssl", "true") - .load() - -df.show() -``` - - - - -```java -Dataset df = spark.read() - .format("clickhouse") - .option("host", "your-clickhouse-host") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "your_table") - .option("user", "default") - .option("password", "your_password") - .option("ssl", "true") - .load(); - -df.show(); -``` - - - - -### Format-based Write Example {#format-based-write} - - - - -```python -# Write to ClickHouse using format API -df.write \ - .format("clickhouse") \ - .option("host", "your-clickhouse-host") \ - .option("protocol", "https") \ - .option("http_port", "8443") \ - .option("database", "default") \ - .option("table", "your_table") \ - .option("user", "default") \ - .option("password", "your_password") \ - .option("ssl", "true") \ - .mode("append") \ - .save() -``` - - - - -```scala -df.write - .format("clickhouse") - .option("host", "your-clickhouse-host") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "your_table") - .option("user", "default") - .option("password", "your_password") - .option("ssl", "true") - .mode("append") - .save() -``` - - - - -```java -df.write() - .format("clickhouse") - .option("host", "your-clickhouse-host") - .option("protocol", "https") - .option("http_port", "8443") - .option("database", "default") - .option("table", "your_table") - .option("user", "default") - .option("password", "your_password") - .option("ssl", "true") - .mode("append") - .save(); -``` - - - - -### TableProvider Features {#tableprovider-features} - -The TableProvider API provides several powerful features: - -#### Automatic Table Creation {#automatic-table-creation} - -When writing to a non-existent table, the connector automatically creates the table with an appropriate schema. The connector provides intelligent defaults: - -- **Engine**: Defaults to `MergeTree()` if not specified. You can specify a different engine using the `engine` option (e.g., `ReplacingMergeTree()`, `SummingMergeTree()`, etc.) -- **ORDER BY**: **Required** - You must explicitly specify the `order_by` option when creating a new table. The connector validates that all specified columns exist in the schema. -- **Nullable Key Support**: Automatically adds `settings.allow_nullable_key=1` if ORDER BY contains nullable columns - - - - -```python -# Table will be created automatically with explicit ORDER BY (required) -df.write \ - .format("clickhouse") \ - .option("host", "your-host") \ - .option("database", "default") \ - .option("table", "new_table") \ - .option("order_by", "id") \ - .mode("append") \ - .save() - -# Specify table creation options with custom engine -df.write \ - .format("clickhouse") \ - .option("host", "your-host") \ - .option("database", "default") \ - .option("table", "new_table") \ - .option("order_by", "id, timestamp") \ - .option("engine", "ReplacingMergeTree()") \ - .option("settings.allow_nullable_key", "1") \ - .mode("append") \ - .save() -``` - - - - -```scala -// Table will be created automatically with explicit ORDER BY (required) -df.write - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "new_table") - .option("order_by", "id") - .mode("append") - .save() - -// With explicit table creation options and custom engine -df.write - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "new_table") - .option("order_by", "id, timestamp") - .option("engine", "ReplacingMergeTree()") - .option("settings.allow_nullable_key", "1") - .mode("append") - .save() -``` - - - - -```java -// Table will be created automatically with explicit ORDER BY (required) -df.write() - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "new_table") - .option("order_by", "id") - .mode("append") - .save(); - -// With explicit table creation options and custom engine -df.write() - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "new_table") - .option("order_by", "id, timestamp") - .option("engine", "ReplacingMergeTree()") - .option("settings.allow_nullable_key", "1") - .mode("append") - .save(); -``` - - - - -:::important -**ORDER BY Required**: The `order_by` option is **required** when creating a new table via the TableProvider API. You must explicitly specify which column(s) to use for the ORDER BY clause. The connector validates that all specified columns exist in the schema and will throw an error if any columns are missing. - -**Engine Selection**: The default engine is `MergeTree()`, but you can specify any ClickHouse table engine using the `engine` option (e.g., `ReplacingMergeTree()`, `SummingMergeTree()`, `AggregatingMergeTree()`, etc.). -::: - -### TableProvider Connection Options {#tableprovider-connection-options} - -When using the format-based API, the following connection options are available: - -#### Connection Options {#connection-options} - -| Option | Description | Default Value | Required | -|--------------|--------------------------------------------------|----------------|----------| -| `host` | ClickHouse server hostname | `localhost` | Yes | -| `protocol` | Connection protocol (`http` or `https`) | `http` | No | -| `http_port` | HTTP/HTTPS port | `8123` | No | -| `database` | Database name | `default` | Yes | -| `table` | Table name | N/A | Yes | -| `user` | Username for authentication | `default` | No | -| `password` | Password for authentication | (empty string) | No | -| `ssl` | Enable SSL connection | `false` | No | -| `ssl_mode` | SSL mode (`NONE`, `STRICT`, etc.) | `STRICT` | No | -| `timezone` | Timezone for date/time operations | `server` | No | - -#### Table Creation Options {#table-creation-options} - -These options are used when the table doesn't exist and needs to be created: - -| Option | Description | Default Value | Required | -|-----------------------------|-----------------------------------------------------------------------------|-------------------|----------| -| `order_by` | Column(s) to use for ORDER BY clause. Comma-separated for multiple columns | N/A | **Yes** | -| `engine` | ClickHouse table engine (e.g., `MergeTree()`, `ReplacingMergeTree()`, `SummingMergeTree()`, etc.) | `MergeTree()` | No | -| `settings.allow_nullable_key` | Enable nullable keys in ORDER BY (for ClickHouse Cloud) | Auto-detected** | No | -| `settings.` | Any ClickHouse table setting | N/A | No | -| `cluster` | Cluster name for Distributed tables | N/A | No | -| `clickhouse.column..variant_types` | Comma-separated list of ClickHouse types for Variant columns (e.g., `String, Int64, Bool, JSON`). Type names are case-sensitive. Spaces after commas are optional. | N/A | No | - -\* The `order_by` option is required when creating a new table. All specified columns must exist in the schema. -\** Automatically set to `1` if ORDER BY contains nullable columns and not explicitly provided. - -:::tip -**Best Practice**: For ClickHouse Cloud, explicitly set `settings.allow_nullable_key=1` if your ORDER BY columns might be nullable, as ClickHouse Cloud requires this setting. -::: - -#### Writing Modes {#writing-modes} - -The Spark connector (both TableProvider API and Catalog API) supports the following Spark write modes: - -- **`append`**: Add data to existing table -- **`overwrite`**: Replace all data in the table (truncates table) - -:::important -**Partition Overwrite Not Supported**: The connector does not currently support partition-level overwrite operations (e.g., `overwrite` mode with `partitionBy`). This feature is in progress. See [GitHub issue #34](https://github.com/ClickHouse/spark-clickhouse-connector/issues/34) for tracking this feature. -::: - - - - -```python -# Overwrite mode (truncates table first) -df.write \ - .format("clickhouse") \ - .option("host", "your-host") \ - .option("database", "default") \ - .option("table", "my_table") \ - .mode("overwrite") \ - .save() -``` - - - - -```scala -// Overwrite mode (truncates table first) -df.write - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "my_table") - .mode("overwrite") - .save() -``` - - - - -```java -// Overwrite mode (truncates table first) -df.write() - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "my_table") - .mode("overwrite") - .save(); -``` - - - - -## Configuring ClickHouse Options {#configuring-clickhouse-options} - -Both the Catalog API and TableProvider API support configuring ClickHouse-specific options (not connector options). These are passed through to ClickHouse when creating tables or executing queries. - -ClickHouse options allow you to configure ClickHouse-specific settings like `allow_nullable_key`, `index_granularity`, and other table-level or query-level settings. These are different from connector options (like `host`, `database`, `table`) which control how the connector connects to ClickHouse. - -### Using TableProvider API {#using-tableprovider-api-options} - -With the TableProvider API, use the `settings.` option format: - - - - -```python -df.write \ - .format("clickhouse") \ - .option("host", "your-host") \ - .option("database", "default") \ - .option("table", "my_table") \ - .option("order_by", "id") \ - .option("settings.allow_nullable_key", "1") \ - .option("settings.index_granularity", "8192") \ - .mode("append") \ - .save() -``` - - - - -```scala -df.write - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "my_table") - .option("order_by", "id") - .option("settings.allow_nullable_key", "1") - .option("settings.index_granularity", "8192") - .mode("append") - .save() -``` - - - - -```java -df.write() - .format("clickhouse") - .option("host", "your-host") - .option("database", "default") - .option("table", "my_table") - .option("order_by", "id") - .option("settings.allow_nullable_key", "1") - .option("settings.index_granularity", "8192") - .mode("append") - .save(); -``` - - - - -### Using Catalog API {#using-catalog-api-options} - -With the Catalog API, use the `spark.sql.catalog..option.` format in your Spark configuration: - -```text -spark.sql.catalog.clickhouse.option.allow_nullable_key 1 -spark.sql.catalog.clickhouse.option.index_granularity 8192 -``` - -Or set them when creating tables via Spark SQL: - -```sql -CREATE TABLE clickhouse.default.my_table ( - id INT, - name STRING -) USING ClickHouse -TBLPROPERTIES ( - engine = 'MergeTree()', - order_by = 'id', - 'settings.allow_nullable_key' = '1', - 'settings.index_granularity' = '8192' -) -``` - ## ClickHouse Cloud settings {#clickhouse-cloud-settings} When connecting to [ClickHouse Cloud](https://clickhouse.com), make sure to enable SSL and set the appropriate SSL mode. For example: @@ -774,10 +344,6 @@ df.show() ## Write data {#write-data} -:::important -**Partition Overwrite Not Supported**: The Catalog API does not currently support partition-level overwrite operations (e.g., `overwrite` mode with `partitionBy`). This feature is in progress. See [GitHub issue #34](https://github.com/ClickHouse/spark-clickhouse-connector/issues/34) for tracking this feature. -::: - @@ -941,546 +507,9 @@ TBLPROPERTIES ( The above examples demonstrate Spark SQL queries, which you can run within your application using any API—Java, Scala, PySpark, or shell. -## Working with VariantType {#working-with-varianttype} - -:::note -VariantType support is available in Spark 4.0+ and requires ClickHouse 25.3+ with experimental JSON/Variant types enabled. -::: - -The connector supports Spark's `VariantType` for working with semi-structured data. VariantType maps to ClickHouse's `JSON` and `Variant` types, allowing you to store and query flexible schema data efficiently. - -:::note -This section focuses specifically on VariantType mapping and usage. For a complete overview of all supported data types, see the [Supported data types](#supported-data-types) section. -::: - -### ClickHouse Type Mapping {#clickhouse-type-mapping} - -| ClickHouse Type | Spark Type | Description | -|----------------|------------|-------------| -| `JSON` | `VariantType` | Stores JSON objects only (must start with `{`) | -| `Variant(T1, T2, ...)` | `VariantType` | Stores multiple types including primitives, arrays, and JSON | - -### Reading VariantType Data {#reading-varianttype-data} - -When reading from ClickHouse, `JSON` and `Variant` columns are automatically mapped to Spark's `VariantType`: - - - - -```scala -// Read JSON column as VariantType -val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table") - -// Access variant data -df.show() - -// Convert variant to JSON string for inspection -import org.apache.spark.sql.functions._ -df.select( - col("id"), - to_json(col("data")).as("data_json") -).show() -``` - - - - -```python -# Read JSON column as VariantType -df = spark.sql("SELECT id, data FROM clickhouse.default.json_table") - -# Access variant data -df.show() - -# Convert variant to JSON string for inspection -from pyspark.sql.functions import to_json -df.select( - "id", - to_json("data").alias("data_json") -).show() -``` - - - - -```java -// Read JSON column as VariantType -Dataset df = spark.sql("SELECT id, data FROM clickhouse.default.json_table"); - -// Access variant data -df.show(); - -// Convert variant to JSON string for inspection -import static org.apache.spark.sql.functions.*; -df.select( - col("id"), - to_json(col("data")).as("data_json") -).show(); -``` - - - - -### Writing VariantType Data {#writing-varianttype-data} - -You can write VariantType data to ClickHouse using either JSON or Variant column types: - - - - -```scala -import org.apache.spark.sql.functions._ - -// Create DataFrame with JSON data -val jsonData = Seq( - (1, """{"name": "Alice", "age": 30}"""), - (2, """{"name": "Bob", "age": 25}"""), - (3, """{"name": "Charlie", "city": "NYC"}""") -).toDF("id", "json_string") - -// Parse JSON strings to VariantType -val variantDF = jsonData.select( - col("id"), - parse_json(col("json_string")).as("data") -) - -// Write to ClickHouse with JSON type (JSON objects only) -variantDF.writeTo("clickhouse.default.user_data").create() - -// Or specify Variant with multiple types -spark.sql(""" - CREATE TABLE clickhouse.default.mixed_data ( - id INT, - data VARIANT - ) USING clickhouse - TBLPROPERTIES ( - 'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON', - 'engine' = 'MergeTree()', - 'order_by' = 'id' - ) -""") -``` - - - - -```python -from pyspark.sql.functions import parse_json - -# Create DataFrame with JSON data -json_data = [ - (1, '{"name": "Alice", "age": 30}'), - (2, '{"name": "Bob", "age": 25}'), - (3, '{"name": "Charlie", "city": "NYC"}') -] -df = spark.createDataFrame(json_data, ["id", "json_string"]) - -# Parse JSON strings to VariantType -variant_df = df.select( - "id", - parse_json("json_string").alias("data") -) - -# Write to ClickHouse with JSON type -variant_df.writeTo("clickhouse.default.user_data").create() - -# Or specify Variant with multiple types -spark.sql(""" - CREATE TABLE clickhouse.default.mixed_data ( - id INT, - data VARIANT - ) USING clickhouse - TBLPROPERTIES ( - 'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON', - 'engine' = 'MergeTree()', - 'order_by' = 'id' - ) -""") -``` - - - - -```java -import static org.apache.spark.sql.functions.*; - -// Create DataFrame with JSON data -List jsonData = Arrays.asList( - RowFactory.create(1, "{\"name\": \"Alice\", \"age\": 30}"), - RowFactory.create(2, "{\"name\": \"Bob\", \"age\": 25}"), - RowFactory.create(3, "{\"name\": \"Charlie\", \"city\": \"NYC\"}") -); -StructType schema = new StructType(new StructField[]{ - DataTypes.createStructField("id", DataTypes.IntegerType, false), - DataTypes.createStructField("json_string", DataTypes.StringType, false) -}); -Dataset jsonDF = spark.createDataFrame(jsonData, schema); - -// Parse JSON strings to VariantType -Dataset variantDF = jsonDF.select( - col("id"), - parse_json(col("json_string")).as("data") -); - -// Write to ClickHouse with JSON type (JSON objects only) -variantDF.writeTo("clickhouse.default.user_data").create(); - -// Or specify Variant with multiple types -spark.sql("CREATE TABLE clickhouse.default.mixed_data (" + - "id INT, " + - "data VARIANT" + - ") USING clickhouse " + - "TBLPROPERTIES (" + - "'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON', " + - "'engine' = 'MergeTree()', " + - "'order_by' = 'id'" + - ")"); -``` - - - - -### Creating VariantType Tables with Spark SQL {#creating-varianttype-tables-spark-sql} - -You can create VariantType tables using Spark SQL DDL: - -```sql --- Create table with JSON type (default) -CREATE TABLE clickhouse.default.json_table ( - id INT, - data VARIANT -) USING clickhouse -TBLPROPERTIES ( - 'engine' = 'MergeTree()', - 'order_by' = 'id' -) -``` - -```sql --- Create table with Variant type supporting multiple types -CREATE TABLE clickhouse.default.flexible_data ( - id INT, - data VARIANT -) USING clickhouse -TBLPROPERTIES ( - 'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON', - 'engine' = 'MergeTree()', - 'order_by' = 'id' -) -``` - -### Configuring Variant Types {#configuring-variant-types} - -When creating tables with VariantType columns, you can specify which ClickHouse types to use: - -#### JSON Type (Default) {#json-type-default} - -If no `variant_types` property is specified, the column defaults to ClickHouse's `JSON` type, which only accepts JSON objects: - -```sql -CREATE TABLE clickhouse.default.json_table ( - id INT, - data VARIANT -) USING clickhouse -TBLPROPERTIES ( - 'engine' = 'MergeTree()', - 'order_by' = 'id' -) -``` - -This creates the following ClickHouse query: - -```sql -CREATE TABLE json_table (id Int32, data JSON) ENGINE = MergeTree() ORDER BY id -``` - -#### Variant Type with Multiple Types {#variant-type-multiple-types} - -To support primitives, arrays, and JSON objects, specify the types in the `variant_types` property: - -```sql -CREATE TABLE clickhouse.default.flexible_data ( - id INT, - data VARIANT -) USING clickhouse -TBLPROPERTIES ( - 'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON', - 'engine' = 'MergeTree()', - 'order_by' = 'id' -) -``` - -This creates the following ClickHouse query: - -```sql -CREATE TABLE flexible_data ( - id Int32, - data Variant(String, Int64, Float64, Bool, Array(String), JSON) -) ENGINE = MergeTree() ORDER BY id -``` - -### Supported Variant Types {#supported-variant-types} - -The following ClickHouse types can be used in `Variant()`: - -- **Primitives**: `String`, `Int8`, `Int16`, `Int32`, `Int64`, `UInt8`, `UInt16`, `UInt32`, `UInt64`, `Float32`, `Float64`, `Bool` -- **Arrays**: `Array(T)` where T is any supported type, including nested arrays -- **JSON**: `JSON` for storing JSON objects - -### Read Format Configuration {#read-format-configuration} - -By default, JSON and Variant columns are read as `VariantType`. You can override this behavior to read them as strings: - - - - -```scala -// Read JSON/Variant as strings instead of VariantType -spark.conf.set("spark.clickhouse.read.jsonAs", "string") - -val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table") -// data column will be StringType containing JSON strings -``` - - - - -```python -# Read JSON/Variant as strings instead of VariantType -spark.conf.set("spark.clickhouse.read.jsonAs", "string") - -df = spark.sql("SELECT id, data FROM clickhouse.default.json_table") -# data column will be StringType containing JSON strings -``` - - - - -```java -// Read JSON/Variant as strings instead of VariantType -spark.conf().set("spark.clickhouse.read.jsonAs", "string"); - -Dataset df = spark.sql("SELECT id, data FROM clickhouse.default.json_table"); -// data column will be StringType containing JSON strings -``` - - - - -### Write Format Support {#write-format-support} - -VariantType write support varies by format: - -| Format | Support | Notes | -|--------|---------|-------| -| JSON | ✅ Full | Supports both `JSON` and `Variant` types. Recommended for VariantType data | -| Arrow | ⚠️ Partial | Supports writing to ClickHouse `JSON` type. Does not support ClickHouse `Variant` type. Full support is pending resolution of https://github.com/ClickHouse/ClickHouse/issues/92752 | - -Configure the write format: - -```scala -spark.conf.set("spark.clickhouse.write.format", "json") // Recommended for Variant types -``` - -:::tip -If you need to write to a ClickHouse `Variant` type, use JSON format. Arrow format only supports writing to `JSON` type. -::: - -### Best Practices {#varianttype-best-practices} - -1. **Use JSON type for JSON-only data**: If you only store JSON objects, use the default JSON type (no `variant_types` property) -2. **Specify types explicitly**: When using `Variant()`, explicitly list all types you plan to store -3. **Enable experimental features**: Ensure ClickHouse has `allow_experimental_json_type = 1` enabled -4. **Use JSON format for writes**: JSON format is recommended for VariantType data for better compatibility -5. **Consider query patterns**: JSON/Variant types support ClickHouse's JSON path queries for efficient filtering -6. **Column hints for performance**: When using JSON fields in ClickHouse, adding column hints improves query performance. Currently, adding column hints via Spark is not supported. See [GitHub issue #497](https://github.com/ClickHouse/spark-clickhouse-connector/issues/497) for tracking this feature. - -### Example: Complete Workflow {#varianttype-example-workflow} - - - - -```scala -import org.apache.spark.sql.functions._ - -// Enable experimental JSON type in ClickHouse -spark.sql("SET allow_experimental_json_type = 1") - -// Create table with Variant column -spark.sql(""" - CREATE TABLE clickhouse.default.events ( - event_id BIGINT, - event_time TIMESTAMP, - event_data VARIANT - ) USING clickhouse - TBLPROPERTIES ( - 'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON', - 'engine' = 'MergeTree()', - 'order_by' = 'event_time' - ) -""") - -// Prepare data with mixed types -val events = Seq( - (1L, "2024-01-01 10:00:00", """{"action": "login", "user_id": 123}"""), - (2L, "2024-01-01 10:05:00", """{"action": "purchase", "amount": 99.99}"""), - (3L, "2024-01-01 10:10:00", """{"action": "logout", "duration": 600}""") -).toDF("event_id", "event_time", "json_data") - -// Convert to VariantType and write -val variantEvents = events.select( - col("event_id"), - to_timestamp(col("event_time")).as("event_time"), - parse_json(col("json_data")).as("event_data") -) - -variantEvents.writeTo("clickhouse.default.events").append() - -// Read and query -val result = spark.sql(""" - SELECT event_id, event_time, event_data - FROM clickhouse.default.events - WHERE event_time >= '2024-01-01' - ORDER BY event_time -""") - -result.show(false) -``` - - - - -```python -from pyspark.sql.functions import parse_json, to_timestamp - -# Enable experimental JSON type in ClickHouse -spark.sql("SET allow_experimental_json_type = 1") - -# Create table with Variant column -spark.sql(""" - CREATE TABLE clickhouse.default.events ( - event_id BIGINT, - event_time TIMESTAMP, - event_data VARIANT - ) USING clickhouse - TBLPROPERTIES ( - 'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON', - 'engine' = 'MergeTree()', - 'order_by' = 'event_time' - ) -""") - -# Prepare data with mixed types -events = [ - (1, "2024-01-01 10:00:00", '{"action": "login", "user_id": 123}'), - (2, "2024-01-01 10:05:00", '{"action": "purchase", "amount": 99.99}'), - (3, "2024-01-01 10:10:00", '{"action": "logout", "duration": 600}') -] -df = spark.createDataFrame(events, ["event_id", "event_time", "json_data"]) - -# Convert to VariantType and write -variant_events = df.select( - "event_id", - to_timestamp("event_time").alias("event_time"), - parse_json("json_data").alias("event_data") -) - -variant_events.writeTo("clickhouse.default.events").append() - -# Read and query -result = spark.sql(""" - SELECT event_id, event_time, event_data - FROM clickhouse.default.events - WHERE event_time >= '2024-01-01' - ORDER BY event_time -""") - -result.show(truncate=False) -``` - - - - -```java -import static org.apache.spark.sql.functions.*; - -// Enable experimental JSON type in ClickHouse -spark.sql("SET allow_experimental_json_type = 1"); - -// Create table with Variant column -spark.sql("CREATE TABLE clickhouse.default.events (" + - "event_id BIGINT, " + - "event_time TIMESTAMP, " + - "event_data VARIANT" + - ") USING clickhouse " + - "TBLPROPERTIES (" + - "'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON', " + - "'engine' = 'MergeTree()', " + - "'order_by' = 'event_time'" + - ")"); - -// Prepare data with mixed types -List events = Arrays.asList( - RowFactory.create(1L, "2024-01-01 10:00:00", "{\"action\": \"login\", \"user_id\": 123}"), - RowFactory.create(2L, "2024-01-01 10:05:00", "{\"action\": \"purchase\", \"amount\": 99.99}"), - RowFactory.create(3L, "2024-01-01 10:10:00", "{\"action\": \"logout\", \"duration\": 600}") -); -StructType eventSchema = new StructType(new StructField[]{ - DataTypes.createStructField("event_id", DataTypes.LongType, false), - DataTypes.createStructField("event_time", DataTypes.StringType, false), - DataTypes.createStructField("json_data", DataTypes.StringType, false) -}); -Dataset eventsDF = spark.createDataFrame(events, eventSchema); - -// Convert to VariantType and write -Dataset variantEvents = eventsDF.select( - col("event_id"), - to_timestamp(col("event_time")).as("event_time"), - parse_json(col("json_data")).as("event_data") -); - -variantEvents.writeTo("clickhouse.default.events").append(); - -// Read and query -Dataset result = spark.sql("SELECT event_id, event_time, event_data " + - "FROM clickhouse.default.events " + - "WHERE event_time >= '2024-01-01' " + - "ORDER BY event_time"); - -result.show(false); -``` - - - - ## Configurations {#configurations} -The following are the adjustable configurations available in the connector. - -:::note -**Using Configurations**: These are Spark-level configuration options that apply to both Catalog API and TableProvider API. They can be set in two ways: - -1. **Global Spark configuration** (applies to all operations): - ```python - spark.conf.set("spark.clickhouse.write.batchSize", "20000") - spark.conf.set("spark.clickhouse.write.compression.codec", "lz4") - ``` - -2. **Per-operation override** (TableProvider API only - can override global settings): - ```python - df.write \ - .format("clickhouse") \ - .option("host", "your-host") \ - .option("database", "default") \ - .option("table", "my_table") \ - .option("spark.clickhouse.write.batchSize", "20000") \ - .option("spark.clickhouse.write.compression.codec", "lz4") \ - .mode("append") \ - .save() - ``` - -Alternatively, set them in `spark-defaults.conf` or when creating the Spark session. -::: +The following are the adjustable configurations available in the connector:
@@ -1526,7 +555,7 @@ for converting data types when reading from ClickHouse into Spark and when inser | `Int128`,`UInt128`, `Int256`, `UInt256` | `DecimalType(38, 0)` | ✅ | Yes | | | `Float32` | `FloatType` | ✅ | Yes | | | `Float64` | `DoubleType` | ✅ | Yes | | -| `String`, `UUID`, `Enum8`, `Enum16`, `IPv4`, `IPv6` | `StringType` | ✅ | Yes | | +| `String`, `JSON`, `UUID`, `Enum8`, `Enum16`, `IPv4`, `IPv6` | `StringType` | ✅ | Yes | | | `FixedString` | `BinaryType`, `StringType` | ✅ | Yes | Controlled by configuration `READ_FIXED_STRING_AS` | | `Decimal` | `DecimalType` | ✅ | Yes | Precision and scale up to `Decimal128` | | `Decimal32` | `DecimalType(9, scale)` | ✅ | Yes | | @@ -1539,7 +568,6 @@ for converting data types when reading from ClickHouse into Spark and when inser | `IntervalYear` | `YearMonthIntervalType(Year)` | ✅ | Yes | | | `IntervalMonth` | `YearMonthIntervalType(Month)` | ✅ | Yes | | | `IntervalDay`, `IntervalHour`, `IntervalMinute`, `IntervalSecond` | `DayTimeIntervalType` | ✅ | No | Specific interval type is used | -| `JSON`, `Variant` | `VariantType` | ✅ | No | Requires Spark 4.0+ and ClickHouse 25.3+. Can be read as `StringType` with `spark.clickhouse.read.jsonAs=string` | | `Object` | | ❌ | | | | `Nested` | | ❌ | | | | `Tuple` | `StructType` | ✅ | No | Supports both named and unnamed tuples. Named tuples map to struct fields by name, unnamed tuples use `_1`, `_2`, etc. Supports nested structs and nullable fields | @@ -1573,7 +601,7 @@ for converting data types when reading from ClickHouse into Spark and when inser | `ArrayType` (list, tuple, or array) | `Array` | ✅ | No | Array element type is also converted | | `MapType` | `Map` | ✅ | No | Keys are limited to `StringType` | | `StructType` | `Tuple` | ✅ | No | Converted to named Tuple with field names. | -| `VariantType` | `JSON` or `Variant` | ✅ | No | Requires Spark 4.0+ and ClickHouse 25.3+. Defaults to `JSON` type. Use `clickhouse.column..variant_types` property to specify `Variant` with multiple types. | +| `VariantType` | `VariantType` | ❌ | No | | | `Object` | | ❌ | | | | `Nested` | | ❌ | | | diff --git a/docs/integrations/data-ingestion/data-ingestion-index.md b/docs/integrations/data-ingestion/data-ingestion-index.md index 5864571be02..5a6d9241064 100644 --- a/docs/integrations/data-ingestion/data-ingestion-index.md +++ b/docs/integrations/data-ingestion/data-ingestion-index.md @@ -15,7 +15,6 @@ For more information check out the pages below: |------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | [Airbyte](/integrations/airbyte) | An open-source data integration platform. It allows the creation of ELT data pipelines and is shipped with more than 140 out-of-the-box connectors. | | [Apache Spark](/integrations/apache-spark) | A multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters | -| [Databricks](/integrations/data-ingestion/apache-spark/databricks) | A unified analytics platform built on Apache Spark, providing a collaborative environment for data engineering, data science, and machine learning workloads with ClickHouse integration. | | [Apache Flink](https://github.com/ClickHouse/flink-connector-clickhouse) | Real-time data ingestion and processing into ClickHouse through Flink's DataStream API with support for batch writes | | [Amazon Glue](/integrations/glue) | A fully managed, serverless data integration service provided by Amazon Web Services (AWS) simplifying the process of discovering, preparing, and transforming data for analytics, machine learning, and application development. | | [Azure Synapse](/integrations/azure-synapse) | A fully managed, cloud-based analytics service provided by Microsoft Azure, combining big data and data warehousing to simplify data integration, transformation, and analytics at scale using SQL, Apache Spark, and data pipelines. | diff --git a/sidebars.js b/sidebars.js index 4590c3a40ca..9977c807666 100644 --- a/sidebars.js +++ b/sidebars.js @@ -967,7 +967,6 @@ const sidebars = { items: [ 'integrations/data-ingestion/apache-spark/index', 'integrations/data-ingestion/apache-spark/spark-native-connector', - 'integrations/data-ingestion/apache-spark/databricks', 'integrations/data-ingestion/apache-spark/spark-jdbc', ], }, diff --git a/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-install-from-volume.png b/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-install-from-volume.png deleted file mode 100644 index 2318cac3472..00000000000 Binary files a/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-install-from-volume.png and /dev/null differ diff --git a/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-libraries-tab.png b/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-libraries-tab.png deleted file mode 100644 index 3cc1426f19f..00000000000 Binary files a/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-libraries-tab.png and /dev/null differ diff --git a/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-maven-tab.png b/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-maven-tab.png deleted file mode 100644 index d60132ac005..00000000000 Binary files a/static/images/integrations/data-ingestion/apache-spark/databricks/databricks-maven-tab.png and /dev/null differ diff --git a/static/images/integrations/logos/databricks-logo.svg b/static/images/integrations/logos/databricks-logo.svg deleted file mode 100644 index c449e5e77bc..00000000000 --- a/static/images/integrations/logos/databricks-logo.svg +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file