Singer tap that extracts data from a MySQL database and produces JSON-formatted data following the Singer spec.
This is a PipelineWise compatible tap connector.
The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Tap MySQL
If you want to run this Singer Tap independently please read further.
This section dives into basic usage of tap-mysql by walking through extracting
data from a table. It assumes that you can connect to and read from a MySQL
database.
First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.
It's recommended to use a virtualenv:
python3 -m venv venv
pip install pipelinewise-tap-mysqlor
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .There's some important business data siloed in this MySQL database -- we need to extract it. Here's the table we'd like to sync:
mysql> select * from example_db.animals;
+----|----------|----------------------+
| id | name | likes_getting_petted |
+----|----------|----------------------+
| 1 | aardvark | 0 |
| 2 | bear | 0 |
| 3 | cow | 1 |
+----|----------|----------------------+
3 rows in set (0.00 sec)
Create a config file containing the database connection credentials, e.g.:
{
"host": "localhost",
"port": "3306",
"user": "root",
"password": "password"
}These are the same basic configuration properties used by the MySQL command-line
client (mysql).
filter_db: Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance
session_sqls: List of SQL commands to run when a connection made. This allows to set session variables dynamically, like timeouts. If not set then the following commands will be executed:
SET @@session.time_zone="+0:00"
SET @@session.wait_timeout=28800
SET @@session.net_read_timeout=3600
SET @@session.innodb_lock_wait_timeout=3600
The tap can be invoked in discovery mode to find the available tables and columns in the database:
$ tap-mysql --config config.json --discover
A discovered catalog is output, with a JSON-schema description of each table. A source table directly corresponds to a Singer stream.
{
"streams": [
{
"tap_stream_id": "example_db-animals",
"table_name": "animals",
"schema": {
"type": "object",
"properties": {
"name": {
"inclusion": "available",
"type": [
"null",
"string"
],
"maxLength": 255
},
"id": {
"inclusion": "automatic",
"minimum": -2147483648,
"maximum": 2147483647,
"type": [
"null",
"integer"
]
},
"likes_getting_petted": {
"inclusion": "available",
"type": [
"null",
"boolean"
]
}
}
},
"metadata": [
{
"breadcrumb": [],
"metadata": {
"row-count": 3,
"table-key-properties": [
"id"
],
"database-name": "example_db",
"selected-by-default": false,
"is-view": false,
}
},
{
"breadcrumb": [
"properties",
"id"
],
"metadata": {
"sql-datatype": "int(11)",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"name"
],
"metadata": {
"sql-datatype": "varchar(255)",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"likes_getting_petted"
],
"metadata": {
"sql-datatype": "tinyint(1)",
"selected-by-default": true
}
}
],
"stream": "animals"
}
]
}
In sync mode, tap-mysql consumes the catalog and looks for tables and fields
have been marked as selected in their associated metadata entries.
Redirect output from the tap's discovery mode to a file so that it can be modified:
$ tap-mysql -c config.json --discover > properties.jsonThen edit properties.json to make selections. In this example we want the
animals table. The stream's metadata entry (associated with "breadcrumb": [])
gets a top-level selected flag, as does its columns' metadata entries. Additionally,
we will mark the animals table to replicate using a FULL_TABLE strategy. For more,
information, see Replication methods and state file.
[
{
"breadcrumb": [],
"metadata": {
"row-count": 3,
"table-key-properties": [
"id"
],
"database-name": "example_db",
"selected-by-default": false,
"is-view": false,
"selected": true,
"replication-method": "FULL_TABLE"
}
},
{
"breadcrumb": [
"properties",
"id"
],
"metadata": {
"sql-datatype": "int(11)",
"selected-by-default": true,
"selected": true
}
},
{
"breadcrumb": [
"properties",
"name"
],
"metadata": {
"sql-datatype": "varchar(255)",
"selected-by-default": true,
"selected": true
}
},
{
"breadcrumb": [
"properties",
"likes_getting_petted"
],
"metadata": {
"sql-datatype": "tinyint(1)",
"selected-by-default": true,
"selected": true
}
}
]With a properties catalog that describes field and table selections, the tap can be invoked in sync mode:
$ tap-mysql -c config.json --properties properties.jsonMessages are written to standard output following the Singer specification. The resultant stream of JSON data can be consumed by a Singer target.
{"value": {"currently_syncing": "example_db-animals"}, "type": "STATE"}
{"key_properties": ["id"], "stream": "animals", "schema": {"properties": {"name": {"inclusion": "available", "maxLength": 255, "type": ["null", "string"]}, "likes_getting_petted": {"inclusion": "available", "type": ["null", "boolean"]}, "id": {"inclusion": "automatic", "minimum": -2147483648, "type": ["null", "integer"], "maximum": 2147483647}}, "type": "object"}, "type": "SCHEMA"}
{"stream": "animals", "version": 1509133344771, "type": "ACTIVATE_VERSION"}
{"record": {"name": "aardvark", "likes_getting_petted": false, "id": 1}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}
{"record": {"name": "bear", "likes_getting_petted": false, "id": 2}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}
{"record": {"name": "cow", "likes_getting_petted": true, "id": 3}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}
{"stream": "animals", "version": 1509133344771, "type": "ACTIVATE_VERSION"}
{"value": {"currently_syncing": "example_db-animals", "bookmarks": {"example_db-animals": {"initial_full_table_complete": true}}}, "type": "STATE"}
{"value": {"currently_syncing": null, "bookmarks": {"example_db-animals": {"initial_full_table_complete": true}}}, "type": "STATE"}In the above example, we invoked tap-mysql without providing a state file
and without specifying a replication method. The two ways to replicate a given
table are FULL_TABLE and INCREMENTAL.
Full-table replication extracts all data from the source table each time the tap is invoked.
Incremental replication works in conjunction with a state file to only extract new records each time the tap is invoked. This requires a replication key to be specified in the table's metadata as well.
Let's sync the animals table again, but this time using incremental
replication. The replication method and replication key are set in the
table's metadata entry in properties file:
{
"streams": [
{
"tap_stream_id": "example_db-animals",
"table_name": "animals",
"schema": { ... },
"metadata": [
{
"breadcrumb": [],
"metadata": {
"row-count": 3,
"table-key-properties": [
"id"
],
"database-name": "example_db",
"selected-by-default": false,
"is-view": false,
"replication-method": "INCREMENTAL",
"replication-key": "id"
}
},
...
],
"stream": "animals"
}
]
}We have no meaningful state so far, so just invoke the tap in sync mode again without a state file:
$ tap-mysql -c config.json --properties properties.jsonThe output messages look very similar to when the table was replicated using the
default FULL_TABLE replication method. One important difference is that the
STATE messages now contain a replication_key_value -- a bookmark or
high-water mark -- for data that was extracted:
{"type": "STATE", "value": {"currently_syncing": "example_db-animals"}}
{"stream": "animals", "type": "SCHEMA", "schema": {"type": "object", "properties": {"id": {"type": ["null", "integer"], "minimum": -2147483648, "maximum": 2147483647, "inclusion": "automatic"}, "name": {"type": ["null", "string"], "inclusion": "available", "maxLength": 255}, "likes_getting_petted": {"type": ["null", "boolean"], "inclusion": "available"}}}, "key_properties": ["id"]}
{"stream": "animals", "type": "ACTIVATE_VERSION", "version": 1509135204169}
{"stream": "animals", "type": "RECORD", "version": 1509135204169, "record": {"id": 1, "name": "aardvark", "likes_getting_petted": false}}
{"stream": "animals", "type": "RECORD", "version": 1509135204169, "record": {"id": 2, "name": "bear", "likes_getting_petted": false}}
{"stream": "animals", "type": "RECORD", "version": 1509135204169, "record": {"id": 3, "name": "cow", "likes_getting_petted": true}}
{"type": "STATE", "value": {"bookmarks": {"example_db-animals": {"version": 1509135204169, "replication_key_value": 3, "replication_key": "id"}}, "currently_syncing": "example_db-animals"}}
{"type": "STATE", "value": {"bookmarks": {"example_db-animals": {"version": 1509135204169, "replication_key_value": 3, "replication_key": "id"}}, "currently_syncing": null}}Note that the final STATE message has a replication_key_value of 3,
reflecting that the extraction ended on a record that had an id of 3.
Subsequent invocations of the tap will pick up from this bookmark.
Normally, the target will echo the last STATE after it's finished processing
data. For this example, let's manually write a state.json file using the
STATE message:
{
"bookmarks": {
"example_db-animals": {
"version": 1509135204169,
"replication_key_value": 3,
"replication_key": "id"
}
},
"currently_syncing": null
}Let's add some more animals to our farm:
mysql> insert into animals (name, likes_getting_petted) values ('dog', true), ('elephant', true), ('frog', false);
$ tap-mysql -c config.json --properties properties.json --state state.jsonThis invocation extracts any data since (and including) the
replication_key_value:
{"type": "STATE", "value": {"bookmarks": {"example_db-animals": {"replication_key": "id", "version": 1509135204169, "replication_key_value": 3}}, "currently_syncing": "example_db-animals"}}
{"key_properties": ["id"], "schema": {"properties": {"name": {"maxLength": 255, "inclusion": "available", "type": ["null", "string"]}, "id": {"maximum": 2147483647, "minimum": -2147483648, "inclusion": "automatic", "type": ["null", "integer"]}, "likes_getting_petted": {"inclusion": "available", "type": ["null", "boolean"]}}, "type": "object"}, "type": "SCHEMA", "stream": "animals"}
{"type": "ACTIVATE_VERSION", "version": 1509135204169, "stream": "animals"}
{"record": {"name": "cow", "id": 3, "likes_getting_petted": true}, "type": "RECORD", "version": 1509135204169, "stream": "animals"}
{"record": {"name": "dog", "id": 4, "likes_getting_petted": true}, "type": "RECORD", "version": 1509135204169, "stream": "animals"}
{"record": {"name": "elephant", "id": 5, "likes_getting_petted": true}, "type": "RECORD", "version": 1509135204169, "stream": "animals"}
{"record": {"name": "frog", "id": 6, "likes_getting_petted": false}, "type": "RECORD", "version": 1509135204169, "stream": "animals"}
{"type": "STATE", "value": {"bookmarks": {"example_db-animals": {"replication_key": "id", "version": 1509135204169, "replication_key_value": 6}}, "currently_syncing": "example_db-animals"}}
{"type": "STATE", "value": {"bookmarks": {"example_db-animals": {"replication_key": "id", "version": 1509135204169, "replication_key_value": 6}}, "currently_syncing": null}}- You'll need to have a running MySQL or MariaDB server to run the tests. Run the following SQL commands as a privileged user to create the required objects:
CREATE USER <mysql-user> IDENTIFIED BY '<mysql-password>';
CREATE DATABASE tap_mysql_test;
GRANT ALL PRIVILEGES ON tap_mysql_test.* TO <mysql-user>;
Note: The user and password can be anything but the database name needs to be tap_mysql_test.
- Define the environment variables that are required to run the tests:
export TAP_MYSQL_HOST=<mysql-host>
export TAP_MYSQL_PORT=<mysql-port>
export TAP_MYSQL_USER=<mysql-user>
export TAP_MYSQL_PASSWORD=<mysql-password>
- Install python test dependencies in a virtual env
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .[test]- To run tests:
nosetests -c .noserc tests- Install python dependencies and run python linter
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .[test]
pylint --rcfile .pylintrc tap_mysql
Based on Stitch documentation