Skip to content

Commit

Permalink
feat(python): TableWriter and Hive connector (facebookincubator#12279)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#12279

Adds support for table writer and other required hook ups for Hive
connector. Also adding static registries to keep track of registered connectors
and tasks executed.

Reviewed By: kevinwilfong

Differential Revision: D69163158

fbshipit-source-id: c0a14ca05b694487e3183f70182269eea58083da
  • Loading branch information
pedroerp authored and facebook-github-bot committed Feb 6, 2025
1 parent 3375085 commit 1e45534
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 17 deletions.
24 changes: 23 additions & 1 deletion velox/py/plan_builder/PyPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ void registerAllResourcesOnce() {
velox::dwrf::registerDwrfWriterFactory();
velox::dwrf::registerDwrfReaderFactory();

velox::dwio::common::LocalFileSink::registerFactory();

velox::parse::registerTypeResolver();

velox::core::PlanNode::registerSerDe();
Expand Down Expand Up @@ -94,8 +96,28 @@ std::optional<PyPlanNode> PyPlanBuilder::planNode() const {
return std::nullopt;
}

PyPlanBuilder& PyPlanBuilder::tableWrite(
const PyType& outputSchema,
const PyFile& outputFile,
const std::string& connectorId) {
exec::test::PlanBuilder::TableWriterBuilder builder(planBuilder_);

// Try to convert the output type.
auto outputRowSchema = asRowType(outputSchema.type());
if (outputRowSchema == nullptr) {
throw std::runtime_error("Output schema must be a ROW().");
}

builder.outputType(outputRowSchema)
.outputFileName(outputFile.filePath())
.fileFormat(outputFile.fileFormat())
.connectorId(connectorId)
.endTableWriter();
return *this;
}

PyPlanBuilder& PyPlanBuilder::tableScan(
const velox::py::PyType& outputSchema,
const PyType& outputSchema,
const py::dict& aliases,
const py::dict& subfields,
const std::string& rowIndexColumnName,
Expand Down
14 changes: 13 additions & 1 deletion velox/py/plan_builder/PyPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,25 @@ class PyPlanBuilder {
/// input_files=[PARQUET("my_file.parquet")],
/// )
PyPlanBuilder& tableScan(
const velox::py::PyType& outputSchema,
const PyType& outputSchema,
const pybind11::dict& aliases,
const pybind11::dict& subfields,
const std::string& rowIndexColumnName,
const std::string& connectorId,
const std::optional<std::vector<PyFile>>& inputFiles);

/// Adds a table writer node to write to an output file(s).
///
/// @param outputSchema The schema to be used when writing the file (columns
/// and types).
/// @param outputFile The output file to be written.
/// @param connectorId The id of the connector to use during the write
/// process.
PyPlanBuilder& tableWrite(
const PyType& outputSchema,
const PyFile& outputFile,
const std::string& connectorId);

// Add the provided vectors straight into the operator tree.
PyPlanBuilder& values(const std::vector<PyVector>& values);

Expand Down
17 changes: 16 additions & 1 deletion velox/py/plan_builder/plan_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ PYBIND11_MODULE(plan_builder, m) {
py::arg("aliases") = py::dict{},
py::arg("subfields") = py::dict{},
py::arg("row_index") = "",
py::arg("connector_id") = "prism",
py::arg("connector_id") = "hive",
py::arg("input_files") = std::nullopt,
py::doc(R"(
Adds a table scan node to the plan.
Expand All @@ -105,6 +105,21 @@ PYBIND11_MODULE(plan_builder, m) {
input_files: If defined, uses as the input files so that no splits
will need to be added later.
)"))
.def(
"table_write",
&velox::py::PyPlanBuilder::tableWrite,
py::arg("output_schema"),
py::arg("output_file"),
py::arg("connector_id") = "hive",
py::doc(R"(
Adds a table write node to the plan.
Args:
output_schema: A RowType containing the schema to be written to
the file.
output_file: Name of the file to be written.
connector_id: ID of the connector to use for this scan.
)"))
.def(
"values",
&velox::py::PyPlanBuilder::values,
Expand Down
49 changes: 45 additions & 4 deletions velox/py/runner/PyLocalRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/py/runner/PyLocalRunner.h"

#include <pybind11/stl.h>
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/core/PlanNode.h"
#include "velox/dwio/common/Options.h"
Expand All @@ -36,12 +37,50 @@ std::mutex& taskRegistryLock() {
return lock;
}

std::unordered_set<std::string>& connectorRegistry() {
static std::unordered_set<std::string> registry;
return registry;
}

} // namespace

namespace py = pybind11;

void registerHive(const std::string& connectorId) {
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());

// TODO: Allow Python users to specify connector configs.
std::unordered_map<std::string, std::string> configValues = {};
const auto configs =
std::make_shared<velox::config::ConfigBase>(std::move(configValues));

auto hiveConnector =
connector::getConnectorFactory(connectorId)
->newConnector(
connectorId, configs, folly::getGlobalCPUExecutor().get());
connector::registerConnector(hiveConnector);
connectorRegistry().insert(connectorId);
}

// Is it ok to unregister connectors that were not registered.
void unregisterHive(const std::string& connectorId) {
if (!facebook::velox::connector::unregisterConnector(connectorId) ||
!facebook::velox::connector::unregisterConnectorFactory(connectorId)) {
throw std::runtime_error(
fmt::format("Unable to unregister connector '{}'", connectorId));
}
connectorRegistry().erase(connectorId);
}

void unregisterAll() {
while (!connectorRegistry().empty()) {
unregisterHive(*connectorRegistry().begin());
}
}

PyVector PyTaskIterator::Iterator::operator*() const {
return PyVector{vector_};
return PyVector{vector_, outputPool_};
}

void PyTaskIterator::Iterator::advance() {
Expand All @@ -56,7 +95,8 @@ PyLocalRunner::PyLocalRunner(
const PyPlanNode& pyPlanNode,
const std::shared_ptr<memory::MemoryPool>& pool,
const std::shared_ptr<folly::CPUThreadPoolExecutor>& executor)
: pool_(pool),
: rootPool_(pool),
outputPool_(memory::memoryManager()->addLeafPool()),
executor_(executor),
planNode_(pyPlanNode.planNode()),
scanFiles_(pyPlanNode.scanFiles()) {
Expand All @@ -69,11 +109,12 @@ PyLocalRunner::PyLocalRunner(
core::QueryConfig(configs),
{},
cache::AsyncDataCache::getInstance(),
pool_);
rootPool_);

cursor_ = exec::TaskCursor::create({
.planNode = planNode_,
.queryCtx = queryCtx,
.outputPool = outputPool_,
});
}

Expand Down Expand Up @@ -105,7 +146,7 @@ py::iterator PyLocalRunner::execute() {
taskRegistry().push_back(cursor_->task());
}

pyIterator_ = std::make_shared<PyTaskIterator>(cursor_);
pyIterator_ = std::make_shared<PyTaskIterator>(cursor_, outputPool_);
return py::make_iterator(pyIterator_->begin(), pyIterator_->end());
}

Expand Down
30 changes: 23 additions & 7 deletions velox/py/runner/PyLocalRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@

namespace facebook::velox::py {

/// Register hive connector using a given `connectorId`
void registerHive(const std::string& connectorId);

/// Unregister the connector registered above.
void unregisterHive(const std::string& connectorId);

/// Unregister all connectors that have been registered by this module.
void unregisterAll();

class PyTaskIterator;

/// A C++ wrapper to allow Python clients to execute plans using TaskCursor.
Expand Down Expand Up @@ -57,8 +66,9 @@ class PyLocalRunner {
private:
friend class PyTaskIterator;

// Memory pool and thread pool to be used by queryCtx.
std::shared_ptr<memory::MemoryPool> pool_;
// Memory pools and thread pool to be used by queryCtx.
std::shared_ptr<memory::MemoryPool> rootPool_;
std::shared_ptr<memory::MemoryPool> outputPool_;
std::shared_ptr<folly::CPUThreadPoolExecutor> executor_;

// The plan node to be executed (created using velox.py.plan_builder).
Expand All @@ -78,15 +88,19 @@ class PyLocalRunner {
// returned by them needs to be comparable and incrementable.
class PyTaskIterator {
public:
explicit PyTaskIterator(const std::shared_ptr<exec::TaskCursor>& cursor)
: cursor_(cursor) {}
explicit PyTaskIterator(
const std::shared_ptr<exec::TaskCursor>& cursor,
const std::shared_ptr<memory::MemoryPool>& pool)
: outputPool_(pool), cursor_(cursor) {}

class Iterator {
public:
Iterator() {}

explicit Iterator(const std::shared_ptr<exec::TaskCursor>& cursor)
: cursor_(cursor) {
explicit Iterator(
const std::shared_ptr<exec::TaskCursor>& cursor,
const std::shared_ptr<memory::MemoryPool>& pool)
: outputPool_(pool), cursor_(cursor) {
// Advance to the first batch.
advance();
}
Expand All @@ -109,19 +123,21 @@ class PyTaskIterator {
}

private:
std::shared_ptr<memory::MemoryPool> outputPool_;
std::shared_ptr<exec::TaskCursor> cursor_;
RowVectorPtr vector_{nullptr};
};

Iterator begin() const {
return Iterator(cursor_);
return Iterator(cursor_, outputPool_);
}

Iterator end() const {
return Iterator();
}

private:
std::shared_ptr<memory::MemoryPool> outputPool_;
std::shared_ptr<exec::TaskCursor> cursor_;
};

Expand Down
32 changes: 30 additions & 2 deletions velox/py/runner/runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,34 @@ PYBIND11_MODULE(runner, m) {
connector_id: The id of the connector used by the scan.
)"));

// Ensure all tasks created by this module have finished.
m.add_object("_cleanup", py::capsule(&velox::py::drainAllTasks));
m.def(
"register_hive",
&velox::py::registerHive,
pybind11::arg("connector_name") = "hive",
py::doc(R"(
"Initialize and register Hive connector.
Args:
connector_name: Name to use for the registered connector.
)"))
.def(
"unregister_hive",
&velox::py::unregisterHive,
pybind11::arg("connector_name") = "hive",
py::doc(R"(
"Unregister Hive connector.
Args:
connector_name: Name of the connector to unregister.",
)"));

// When the module gets unloaded, first ensure all tasks created by this
// module have finished, then unregister all connectors that have been
// registered by this module. We need to explicity unregister them to prevent
// the connectors and their nested structures from being destructed after
// other global and static resources are destructed.
m.add_object("_cleanup", py::capsule([]() {
velox::py::drainAllTasks();
velox::py::unregisterAll();
}));
}
3 changes: 3 additions & 0 deletions velox/py/runner/runner.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ class LocalRunner:
# pyre-fixme[11]: Annotation `Vector` is not defined as a type.
def execute(self) -> Iterator[Vector]: ...
def add_file_split(self, plan_id: str, file_path: str) -> None: ...

def register_hive(str) -> None: ...
def unregister_hive(str) -> None: ...
61 changes: 60 additions & 1 deletion velox/py/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import unittest
import pyarrow
import tempfile

from velox.py.arrow import to_velox
from velox.py.runner import LocalRunner
from velox.py.plan_builder import PlanBuilder
from velox.py.file import DWRF
from velox.py.type import BIGINT, ROW
from velox.py.runner import LocalRunner, register_hive


class TestPyVeloxRunner(unittest.TestCase):
Expand Down Expand Up @@ -80,3 +84,58 @@ def test_runner_with_join(self):
for vector in runner.execute():
total_size += vector.size()
self.assertEqual(total_size, batch_size * batch_size)

def test_write_read_file(self):
# Test writing a batch of data to a dwrf file on disk, then
# reading it back.
register_hive()

# Generate input data.
batch_size = 10
array = pyarrow.array([42] * batch_size)
input_batch = to_velox(pyarrow.record_batch([array], names=["c0"]))

with tempfile.TemporaryDirectory() as temp_dir:
output_file = f"{temp_dir}/output_file"

plan_builder = PlanBuilder()
plan_builder.values([input_batch]).table_write(
output_schema=ROW(["c0"], [BIGINT()]),
output_file=DWRF(output_file),
connector_id="hive",
)

# Execute and write to output file.
runner = LocalRunner(plan_builder.get_plan_node())
iterator = runner.execute()
output = next(iterator)
self.assertRaises(StopIteration, next, iterator)

output_file_from_table_writer = self.extract_file(output)
self.assertEqual(output_file, output_file_from_table_writer)

# Now scan it back.
scan_plan_builder = PlanBuilder()
scan_plan_builder.table_scan(
output_schema=ROW(["c0"], [BIGINT()]),
connector_id="hive",
input_files=[DWRF(output_file)],
)

runner = LocalRunner(scan_plan_builder.get_plan_node())
iterator = runner.execute()
result = next(iterator)
self.assertRaises(StopIteration, next, iterator)

# Ensure the read batch is the same as the one written.
self.assertEqual(input_batch, result)

def extract_file(self, output_vector):
# Parse and return the output file name from the writer's output.
output_json = json.loads(output_vector.child_at(1)[1])
self.assertIsNotNone(output_json)

write_infos = output_json["fileWriteInfos"]
self.assertTrue(isinstance(write_infos, list))
self.assertGreater(len(write_infos), 0)
return write_infos[0].get("targetFileName", "")

0 comments on commit 1e45534

Please sign in to comment.