Skip to content

Commit

Permalink
DatabaseClient: Fix _dataframe_to_table to work with the new `Parse…
Browse files Browse the repository at this point in the history
…dNamespaces` datastructure.
  • Loading branch information
matthewwardrop committed Aug 24, 2018
1 parent bc7e587 commit d2aa8f3
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 86 deletions.
130 changes: 90 additions & 40 deletions omniduct/databases/_namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,102 @@ class ParsedNamespaces:
"""

@classmethod
def from_name(cls, name, namespaces, quote_char='"', separator='.'):
def from_name(cls, name, namespaces, quote_char='"', separator='.', defaults=None):
"""
This classmethod returns an instance of `ParsedNamespaces` which represents
the parsed namespace corresponding to `name`.
"""
if isinstance(name, ParsedNamespaces):
return name
return cls(name, namespaces, quote_char=quote_char, separator=separator)

def __init__(self, name, namespaces, quote_char='"', separator='.'):
"""
Parse `name` into provided `namespaces`.
the parsed namespace corresponding to `name`. If `name` is an instance
of `ParsedNamespaces`, it is checked whether the `namespaces` are a
subset of the namespaces provided to this constructor. If not, a `ValueError`
is raised. Note that the quote charactors, separators and defaults will of
the passed `ParsedNamespaces` will be ignored.
Parameters:
name (str): The name to be parsed.
name (str, ParsedNamespaces): The name to be parsed.
namespaces (list<str>): The namespaces into which the name should be
parsed.
defaults (None, dict): Default values for namespaces. Note that if a
default is provided for a namespace, it will only be used if all
sub-namespaces also resolve to a value (either via defaults or
by being explicitly passed).
quote_char (str): The character to used for optional encapsulation
of namespace names. (default='"')
separator (str): The character used to separate namespaces.
(default='.')
"""
self.__name = name
self.__namespaces = namespaces
self.__quote_char = quote_char
self.__separator = separator
defaults = defaults or {}

namespace_matcher = re.compile(
r"([^{sep}{qc}]+)|{qc}([^`]*?){qc}".format(
qc=re.escape(quote_char),
sep=re.escape(separator)
if isinstance(name, ParsedNamespaces):
extra_namespaces = set(name.namespaces).difference(namespaces)
if extra_namespaces:
raise ValueError(
"ParsedNamespace is not encapsulated by the namespaces "
"provided to this constructor. It has extra namespaces: {}."
.format(extra_namespaces)
)
parsed = name.as_dict()

elif isinstance(name, str):
namespace_matcher = re.compile(
r"([^{sep}{qc}]+)|{qc}([^`]*?){qc}".format(
qc=re.escape(quote_char),
sep=re.escape(separator)
)
)
)

names = [''.join(t) for t in namespace_matcher.findall(name)] if name else []
if len(names) > len(namespaces):
raise ValueError(
"Name '{}' has too many namespaces. Should be of form: <{}>."
.format(name, ">{sep}<".format(sep=self.__separator).join(namespaces))
names = [''.join(t) for t in namespace_matcher.findall(name)] if name else []
if len(names) > len(namespaces):
raise ValueError(
"Name '{}' has too many namespaces. Should be of form: <{}>."
.format(name, ">{sep}<".format(sep=separator).join(namespaces))
)

parsed = OrderedDict(reversed([
(namespace, names.pop() if names else None)
for namespace in namespaces[::-1]
]))

else:
raise ValueError("Cannot construct `ParsedNamespaces` instance from "
"name of type: `{}`.".format(type(name)))

for namespace in namespaces[::-1]:
if not parsed.get(namespace) and namespace in defaults:
parsed[namespace] = defaults[namespace]
elif not parsed.get(namespace):
break

return cls(parsed, quote_char=quote_char, separator=separator)

def __init__(self, names, namespaces=None, quote_char='"', separator='.'):
"""
A container for parsed namespaces. Typically this class should be
constructed via the `.from_name` classmethod.
Parameters:
names (dict<str, str>): An ordered dictionary of
namespaces to names (from most to least generic).
namespaces (None, list<str>): For convenience, in the event that
you want to avoid using an ordered dictionary, you can instead
provide this list of ordered namespaces (from most to least generic).
quote_char (str): The character to used for optional encapsulation
of namespace names. (default='"')
separator (str): The character used to separate namespaces.
(default='.')
"""

if namespaces:
names = OrderedDict(
(namespace, names.get(namespace, None))
for namespace in namespaces
)

self.__dict = {
level: names.pop() if names else None
for level in namespaces[::-1]
}
self.__names = names
self.__quote_char = quote_char
self.__separator = separator

def __getattr__(self, name):
if name in self.__dict:
return self.__dict[name]
if name in self.__names:
return self.__names[name]
raise AttributeError(name)

def __bool__(self):
Expand All @@ -78,13 +125,18 @@ def __bool__(self):
def __nonzero__(self): # Python 2 support for bool
return bool(self.name)

@property
def namespaces(self):
"""The namespaces parsed in order of most to least specific."""
return list(self.__names)

@property
def name(self):
"""The full name provided (with quotes)."""
names = [
self.__dict[namespace]
for namespace in self.__namespaces
if self.__dict.get(namespace)
self.__names[namespace]
for namespace, name in self.__names.items()
if name
]
if len(names) == 0:
return ""
Expand All @@ -97,19 +149,17 @@ def name(self):
@property
def parent(self):
"""An instance of `ParsedNamespaces` with the most specific namespace truncated."""
names = self.__names.copy()
names.popitem()
return ParsedNamespaces(
name=self.__separator.join(self.__name.split(self.__separator)[:-1]),
namespaces=self.__namespaces[:-1],
names=names,
quote_char=self.__quote_char,
separator=self.__separator
)

def as_dict(self):
"""Returns the parsed namespaces as an OrderedDict from most to least general."""
d = OrderedDict()
for namespace in self.__namespaces:
d[namespace] = self.__dict[namespace]
return d
return self.__names

def __str__(self):
return self.name
Expand Down
9 changes: 5 additions & 4 deletions omniduct/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,8 @@ def dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
Parameters:
df (pandas.DataFrame): The dataframe to upload into the database.
table (str): The name of the table into which the dataframe should
be uploaded.
table (str, ParsedNamespaces): The name of the table into which the
dataframe should be uploaded.
if_exists (str): if nominated table already exists: 'fail' to do
nothing, 'replace' to drop, recreate and insert data into new
table, and 'append' to add data from this table into the
Expand All @@ -615,12 +615,13 @@ def _dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
def _cursor_empty(self, cursor):
return False

def _parse_namespaces(self, name, level=0):
def _parse_namespaces(self, name, level=0, defaults=None):
return ParsedNamespaces.from_name(
name,
self.NAMESPACE_NAMES[:-level] if level > 0 else self.NAMESPACE_NAMES,
quote_char=self.NAMESPACE_QUOTECHAR,
separator=self.NAMESPACE_SEPARATOR
separator=self.NAMESPACE_SEPARATOR,
defaults=defaults
)

@quirk_docs('_table_list')
Expand Down
40 changes: 18 additions & 22 deletions omniduct/databases/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _query_to_table(self, statement, table, if_exists, **kwargs):
return self.execute(statement, **kwargs)

def _dataframe_to_table(
self, df, table, if_exists='fail', schema=None, use_hive_cli=None,
self, df, table, if_exists='fail', use_hive_cli=None,
partition=None, sep=chr(1), table_props=None, dtype_overrides=None, **kwargs
):
"""
Expand All @@ -236,9 +236,10 @@ def _dataframe_to_table(
`pyhive` and `impyla`. This may be slower, does not support older
versions of Hive, and does not support table properties or partitioning.
If if the schema namespace is not specified, `table.schema` will be
defaulted to your username.
Additional Parameters:
schema (str): The schema into which the table should be pushed. If
not specified, the schema will be set to your username.
use_hive_cli (bool, None): A local override for the global
`.push_using_hive_cli` attribute. If not specified, the global
default is used. If True, then pushes are performed using the
Expand All @@ -257,7 +258,7 @@ def _dataframe_to_table(
dtype_overrides (dict): Mapping of column names to Hive datatypes to
use instead of default mapping.
"""
schema = schema or self.username
table = self._parse_namespaces(table, defaults={'schema': self.username})
use_hive_cli = use_hive_cli or self.push_using_hive_cli
partition = partition or {}
table_props = table_props or {}
Expand All @@ -275,9 +276,8 @@ def _dataframe_to_table(
"and try again."
)
try:
return df.to_sql(name=table, con=self._sqlalchemy_engine,
index=False, if_exists=if_exists,
schema=schema, **kwargs)
return df.to_sql(name=table.table, schema=table.schema, con=self._sqlalchemy_engine,
index=False, if_exists=if_exists, **kwargs)
except Exception as e:
raise RuntimeError(
"Push unsuccessful. Your version of Hive may be too old to "
Expand Down Expand Up @@ -320,8 +320,8 @@ def _dataframe_to_table(
tblprops = self.default_table_props.copy()
tblprops.update(table_props or {})
cts = self._create_table_statement_from_df(
df=df, table=table,
schema=schema,
df=df,
table=table,
drop=(if_exists == 'replace') and not partition,
text=True,
sep=sep,
Expand All @@ -332,22 +332,20 @@ def _dataframe_to_table(

# Generate load data statement.
partition_clause = '' if not partition else 'PARTITION ({})'.format(','.join("{key} = '{value}'".format(key=key, value=value) for key, value in partition.items()))
lds = '\nLOAD DATA LOCAL INPATH "{path}" {overwrite} INTO TABLE {schema}.{table} {partition_clause};'.format(
lds = '\nLOAD DATA LOCAL INPATH "{path}" {overwrite} INTO TABLE {table} {partition_clause};'.format(
path=os.path.basename(tmp_fname) if self.remote else tmp_fname,
overwrite="OVERWRITE" if if_exists == "replace" else "",
schema=schema,
table=table,
partition_clause=partition_clause
)

# Run create table statement and load data statments
logger.info(
"Creating hive table `{schema}.{table}` if it does not "
"Creating hive table `{table}` if it does not "
"already exist, and inserting the provided data{partition}."
.format(
schema=schema,
table=table,
partition="into {}".format(partition_clause) if partition_clause else ""
partition=" into {}".format(partition_clause) if partition_clause else ""
)
)
try:
Expand All @@ -362,14 +360,13 @@ def _dataframe_to_table(
self.remote.execute('rm -rf {}'.format(tmp_fname))
shutil.rmtree(temp_dir, ignore_errors=True)

logger.info("Successfully uploaded dataframe {partition}`{schema}.{table}`.".format(
schema=schema,
logger.info("Successfully uploaded dataframe {partition}`{table}`.".format(
table=table,
partition="into {} of ".format(partition_clause) if partition_clause else ""
))

def _table_list(self, namespace, like='*', **kwargs):
schema = namespace.name or self.schema or 'default'
schema = namespace.name or self.schema
return self.query("SHOW TABLES IN {0} '{1}'".format(schema, like),
**kwargs)

Expand Down Expand Up @@ -422,7 +419,7 @@ def _run_in_hivecli(self, cmd):
return proc

@classmethod
def _create_table_statement_from_df(cls, df, table, schema='default', drop=False,
def _create_table_statement_from_df(cls, df, table, drop=False,
text=True, sep=chr(1), loc=None,
table_props=None, partition_cols=None,
dtype_overrides=None):
Expand All @@ -432,8 +429,7 @@ def _create_table_statement_from_df(cls, df, table, schema='default', drop=False
Parameters:
df (pandas.DataFrame, pandas.Series): Used to determine column names
and types for create table statement.
table (str): The name of the target table.
schema (str): The name of the target schema.
table (ParsedNamespaces): The parsed name of the target table.
drop (bool): Whether to include a drop table statement before the
create table statement.
text (bool): Whether data will be stored as a textfile.
Expand Down Expand Up @@ -489,9 +485,9 @@ def _create_table_statement_from_df(cls, df, table, schema='default', drop=False

cmd = Template("""
{% if drop %}
DROP TABLE IF EXISTS {{ schema }}.{{ table }};
DROP TABLE IF EXISTS {{ table }};
{% endif -%}
CREATE TABLE IF NOT EXISTS {{ schema }}.{{ table }} (
CREATE TABLE IF NOT EXISTS {{ table }} (
{%- for col in columns %}
{{ col }} {% if not loop.last %}, {% endif %}
{%- endfor %}
Expand Down
13 changes: 7 additions & 6 deletions omniduct/databases/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,15 @@ def _query_to_table(self, statement, table, if_exists, **kwargs):
)
return self.execute(statement, **kwargs)

def _dataframe_to_table(self, df, table, if_exists='fail', schema=None, **kwargs):
def _dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
"""
Additional parameters:
schema (str): The schema into which the table should be pushed. If
not specified, the schema will be set to your username.
If if the schema namespace is not specified, `table.schema` will be
defaulted to your username. Catalog overrides will be ignored, and will
default to `self.catalog`.
"""
return df.to_sql(name=table, con=self._sqlalchemy_engine, index=False,
if_exists=if_exists, schema=schema or self.username, **kwargs)
table = self._parse_namespaces(table, defaults={'schema': self.username})
return df.to_sql(name=table.table, schema=table.schema, con=self._sqlalchemy_engine,
index=False, if_exists=if_exists, **kwargs)

def _cursor_empty(self, cursor):
return False
Expand Down
4 changes: 4 additions & 0 deletions omniduct/databases/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def _query_to_table(self, statement, table, if_exists, **kwargs):
)
return self.execute(statement, **kwargs)

def _dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
return df.to_sql(name=table.table, schema=table.database, con=self.engine,
index=False, if_exists=if_exists, **kwargs)

def _cursor_empty(self, cursor):
return False

Expand Down
Loading

0 comments on commit d2aa8f3

Please sign in to comment.