Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-723] Add write format for (Geo)Arrow #1863

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions spark/common/pom.xml
Original file line number Diff line number Diff line change
@@ -37,6 +37,13 @@
</properties>

<dependencies>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-common_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-common</artifactId>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.WriterCommitMessage;

class ArrowBatchWrite implements BatchWrite {
private final LogicalWriteInfo logicalWriteInfo;

public ArrowBatchWrite(LogicalWriteInfo info) {
this.logicalWriteInfo = info;
}

@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
return new ArrowDataWriterFactory(logicalWriteInfo.schema());
}

@Override
public void commit(WriterCommitMessage[] messages) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'commit'");
}

@Override
public void abort(WriterCommitMessage[] messages) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'abort'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.types.StructType;

class ArrowDataWriterFactory implements DataWriterFactory {
private final StructType schema;

public ArrowDataWriterFactory(StructType schema) {
this.schema = schema;
}

@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
return new ArrowWriter(partitionId, taskId, schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import java.util.HashSet;
import java.util.Set;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.SupportsOverwrite;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;

class ArrowTable implements SupportsWrite, SupportsOverwrite {
private Set<TableCapability> capabilities;
private StructType schema;

ArrowTable(StructType schema) {
this.schema = schema;
}

@Override
public String name() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'name'");
}

@Override
public StructType schema() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'schema'");
}

@Override
public Set<TableCapability> capabilities() {
if (capabilities == null) {
this.capabilities = new HashSet<>();
capabilities.add(TableCapability.BATCH_WRITE);
}
return capabilities;
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new ArrowWriteBuilder(info);
}

@Override
public WriteBuilder overwrite(Filter[] filters) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'overwrite'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import java.util.Map;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

class ArrowTableProvider implements TableProvider {

@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'inferSchema'");
}

@Override
public Table getTable(
StructType schema, Transform[] partitioning, Map<String, String> properties) {
return new ArrowTable(schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;

class ArrowWriteBuilder implements WriteBuilder {

private final LogicalWriteInfo writeInfo;

public ArrowWriteBuilder(LogicalWriteInfo writeInfo) {
this.writeInfo = writeInfo;
}

@Override
public BatchWrite buildForBatch() {
return new ArrowBatchWrite(writeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import java.io.IOException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;

class ArrowWriter implements DataWriter<InternalRow> {
private final int partitionId;
private final long taskId;
private int rowCount;
private AgnosticEncoder<Row> encoder;
private Encoder<Row> rowEncoder;
// https://github.com/apache/spark/blob/9353e94e50f3f73565f5f0023effd7e265c177b9/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala#L50
private ArrowSerializer<Row> serializer;

public ArrowWriter(int partitionId, long taskId, StructType schema) {
this.partitionId = partitionId;
this.taskId = taskId;
this.rowCount = 0;
this.encoder = RowEncoder.encoderFor(schema);
this.serializer = new ArrowSerializer<Row>(encoder, new RootAllocator(), "UTC");

// Create file, write schema
// Problem: ArrowSerializer() does not expose internal to write just the schema
// bytes.
}

@Override
public void close() throws IOException {
// Close file
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'close'");
}

@Override
public void write(InternalRow record) throws IOException {
// Problem: serializer needs a Row but we have an InternalRow
// serializer.append(encoder.fromRow(record));

rowCount++;
if (rowCount > 1024) {
// Problem: writeIpcStream() writes both the schema and the batch, but
// we only want the batch
// serializer.writeIpcStream(null);
rowCount = 0;
}
}

@Override
public WriterCommitMessage commit() throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'commit'");
}

@Override
public void abort() throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'abort'");
}
}