From b9539f3a5b21c36cafa9652360af4af37e2207ed Mon Sep 17 00:00:00 2001 From: John Huang Date: Tue, 19 Jul 2016 10:21:53 -0700 Subject: [PATCH 1/4] print Redshift loading error --- README.md | 8 +++++++ lib/postgres_to_redshift.rb | 46 +++++++++++++++++++++++++++---------- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index badd72c..6ee65af 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,14 @@ export S3_DATABASE_EXPORT_BUCKET='some-bucket-to-use' postgres_to_redshift ``` +Optional flags: + +```bash +# Optional debug flag if you'd like the copy job to ignore Redshift loading errors +# and keep going. +export IGNORE_LOADING_ERRORS_AND_CONTINUE='true' +``` + ## Contributing 1. Fork it ( https://github.com/kitchensurfing/postgres_to_redshift/fork ) diff --git a/lib/postgres_to_redshift.rb b/lib/postgres_to_redshift.rb index c41e689..fcd8a93 100644 --- a/lib/postgres_to_redshift.rb +++ b/lib/postgres_to_redshift.rb @@ -1,11 +1,11 @@ -require "postgres_to_redshift/version" +require 'postgres_to_redshift/version' require 'pg' require 'uri' require 'aws-sdk-v1' require 'zlib' require 'tempfile' -require "postgres_to_redshift/table" -require "postgres_to_redshift/column" +require 'postgres_to_redshift/table' +require 'postgres_to_redshift/column' class PostgresToRedshift class << self @@ -41,7 +41,7 @@ def self.target_uri def self.source_connection unless instance_variable_defined?(:"@source_connection") @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1]) - @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") + @source_connection.exec('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;') end @source_connection @@ -85,7 +85,7 @@ def bucket end def copy_table(table) - tmpfile = Tempfile.new("psql2rs") + tmpfile = Tempfile.new('psql2rs') zip = Zlib::GzipWriter.new(tmpfile) chunksize = 5 * GIGABYTE # uncompressed chunk = 1 @@ -97,14 +97,14 @@ def copy_table(table) source_connection.copy_data(copy_command) do while row = source_connection.get_copy_data zip.write(row) - if (zip.pos > chunksize) + if zip.pos > chunksize zip.finish tmpfile.rewind upload_table(table, tmpfile, chunk) chunk += 1 zip.close unless zip.closed? tmpfile.unlink - tmpfile = Tempfile.new("psql2rs") + tmpfile = Tempfile.new('psql2rs') zip = Zlib::GzipWriter.new(tmpfile) end end @@ -128,14 +128,36 @@ def import_table(table) puts "Importing #{table.target_table_name}" target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating") - target_connection.exec("BEGIN;") + begin + target_connection.exec('BEGIN;') + + target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating") + + target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") + + target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") + + target_connection.exec('COMMIT;') - target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating") + rescue PG::InternalError => e + target_connection.exec('ROLLBACK;') - target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") + print_last_redshift_loading_error if e.message.include?('stl_load_errors') - target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") + continue_after_error = + !ENV['IGNORE_LOADING_ERRORS_AND_CONTINUE'].nil? && + ENV['IGNORE_LOADING_ERRORS_AND_CONTINUE'].downcase == 'true' - target_connection.exec("COMMIT;") + raise unless continue_after_error + end + end + + def print_last_redshift_loading_error + puts 'ERROR: Last Redshift loading error:' + error_row = target_connection.exec('SELECT * FROM pg_catalog.stl_load_errors ORDER BY starttime DESC LIMIT 1').first + error_row.each do |k, v| + puts "\t#{k}: #{v}" + end + puts end end From e12c948c12352cfd19480d7b6473d733d8560dc1 Mon Sep 17 00:00:00 2001 From: John Huang Date: Tue, 19 Jul 2016 13:08:42 -0700 Subject: [PATCH 2/4] rename ENV, and refactor exception handling function --- README.md | 6 +++--- lib/postgres_to_redshift.rb | 31 ++++++++++++++++++++++--------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 6ee65af..a72ab6a 100644 --- a/README.md +++ b/README.md @@ -37,9 +37,9 @@ postgres_to_redshift Optional flags: ```bash -# Optional debug flag if you'd like the copy job to ignore Redshift loading errors -# and keep going. -export IGNORE_LOADING_ERRORS_AND_CONTINUE='true' +# Optional debug flag if you'd like the copy job to only warn on Redshift loading errors +# (instead of exiting) and keep going. +export WARN_ON_LOADING_ERROR='true' ``` ## Contributing diff --git a/lib/postgres_to_redshift.rb b/lib/postgres_to_redshift.rb index fcd8a93..2e5355a 100644 --- a/lib/postgres_to_redshift.rb +++ b/lib/postgres_to_redshift.rb @@ -139,25 +139,38 @@ def import_table(table) target_connection.exec('COMMIT;') - rescue PG::InternalError => e - target_connection.exec('ROLLBACK;') + rescue PG::InternalError => exception + handle_pg_exception(table, exception) + end + end - print_last_redshift_loading_error if e.message.include?('stl_load_errors') + def handle_pg_exception(table, exception) + target_connection.exec('ROLLBACK;') + + if exception.message.include?('stl_load_errors') + puts exception.message + puts "ERROR: Last entry in Redshift's 'stl_load_errors' table:" + print_last_redshift_loading_error + else + puts 'ERROR: Unhandled PG error:' + puts exception.message + puts exception.backtrace.inspect + end - continue_after_error = - !ENV['IGNORE_LOADING_ERRORS_AND_CONTINUE'].nil? && - ENV['IGNORE_LOADING_ERRORS_AND_CONTINUE'].downcase == 'true' + continue_after_error = + !ENV['WARN_ON_LOADING_ERROR'].nil? && ENV['WARN_ON_LOADING_ERROR'].casecmp('true') == 0 - raise unless continue_after_error + if continue_after_error + puts "\nINFO: Skipping '#{table.name}' and continuing on." + else + exit end end def print_last_redshift_loading_error - puts 'ERROR: Last Redshift loading error:' error_row = target_connection.exec('SELECT * FROM pg_catalog.stl_load_errors ORDER BY starttime DESC LIMIT 1').first error_row.each do |k, v| puts "\t#{k}: #{v}" end - puts end end From 6aec3826f66b206fe8dbc2ad6ddef7af48d82b03 Mon Sep 17 00:00:00 2001 From: John Huang Date: Wed, 20 Jul 2016 10:03:45 -0700 Subject: [PATCH 3/4] always re-raise for unhandled errors --- lib/postgres_to_redshift.rb | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/postgres_to_redshift.rb b/lib/postgres_to_redshift.rb index 2e5355a..1e0f4ee 100644 --- a/lib/postgres_to_redshift.rb +++ b/lib/postgres_to_redshift.rb @@ -22,6 +22,7 @@ def self.update_tables update_tables = PostgresToRedshift.new update_tables.tables.each do |table| + next unless table.name == 'mass_emails' # FIXME target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") update_tables.copy_table(table) @@ -151,19 +152,15 @@ def handle_pg_exception(table, exception) puts exception.message puts "ERROR: Last entry in Redshift's 'stl_load_errors' table:" print_last_redshift_loading_error - else - puts 'ERROR: Unhandled PG error:' - puts exception.message - puts exception.backtrace.inspect - end - continue_after_error = - !ENV['WARN_ON_LOADING_ERROR'].nil? && ENV['WARN_ON_LOADING_ERROR'].casecmp('true') == 0 - - if continue_after_error - puts "\nINFO: Skipping '#{table.name}' and continuing on." + if !ENV['WARN_ON_LOADING_ERROR'].nil? && ENV['WARN_ON_LOADING_ERROR'].casecmp('true') == 0 + puts "\nINFO: Skipping '#{table.name}' and continuing on." + else + exit + end else - exit + puts 'ERROR: Unhandled PG error:' + raise end end From 47095115639b7f1fb14cf1124711b2b442e27938 Mon Sep 17 00:00:00 2001 From: John Huang Date: Thu, 21 Jul 2016 09:36:25 -0700 Subject: [PATCH 4/4] typo --- lib/postgres_to_redshift.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/postgres_to_redshift.rb b/lib/postgres_to_redshift.rb index 1e0f4ee..fa5ca4b 100644 --- a/lib/postgres_to_redshift.rb +++ b/lib/postgres_to_redshift.rb @@ -22,7 +22,6 @@ def self.update_tables update_tables = PostgresToRedshift.new update_tables.tables.each do |table| - next unless table.name == 'mass_emails' # FIXME target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") update_tables.copy_table(table)