Skip to content

Commit 5a22a9b

Browse files
committed
Add multithreading, with 1 writer thread per output table
1 parent 2096273 commit 5a22a9b

File tree

1 file changed

+67
-40
lines changed

1 file changed

+67
-40
lines changed

Diff for: src/main.rs

+67-40
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use std::io::{Read, Write, BufReader, BufRead, stdin, stdout};
22
use std::fs::{File, OpenOptions};
33
use std::mem;
4+
use std::fmt::Write as _;
45
use std::path::Path;
56
use std::env;
67
use std::cell::{ Cell, RefCell };
78
use std::time::Instant;
9+
use std::sync::mpsc;
10+
use std::thread;
811
use std::default::Default;
912
use std::collections::HashMap;
1013
use quick_xml::Reader;
@@ -52,7 +55,9 @@ enum Cardinality {
5255
struct Table<'a> {
5356
name: String,
5457
path: String,
55-
file: RefCell<Box<dyn Write>>,
58+
buf: RefCell<String>,
59+
writer_channel: mpsc::SyncSender<String>,
60+
writer_thread: Option<thread::JoinHandle<()>>,
5661
columns: Vec<Column<'a>>,
5762
lastid: RefCell<String>,
5863
domain: Box<Option<RefCell<Domain<'a>>>>,
@@ -63,22 +68,27 @@ struct Table<'a> {
6368
impl<'a> Table<'a> {
6469
fn new(name: &str, path: &str, file: Option<&str>, settings: &Settings, cardinality: Cardinality) -> Table<'a> {
6570
//println!("Table {} path {} file {:?} cardinality {:?}", name, path, file, cardinality);
71+
let out: RefCell<Box<dyn Write + Send>> = match file {
72+
None => RefCell::new(Box::new(stdout())),
73+
Some(ref file) => RefCell::new(Box::new(
74+
match settings.filemode.as_ref() {
75+
"truncate" => File::create(Path::new(file)).unwrap_or_else(|err| fatalerr!("Error: failed to create output file '{}': {}", file, err)),
76+
"append" => OpenOptions::new().append(true).create(true).open(Path::new(file)).unwrap_or_else(|err| fatalerr!("Error: failed to open output file '{}': {}", file, err)),
77+
mode => fatalerr!("Error: invalid 'mode' setting in configuration file: {}", mode)
78+
}
79+
))
80+
};
81+
let (writer_channel, rx) = mpsc::sync_channel(100);
82+
let writer_thread = thread::spawn(move || write_output(out, rx));
6683
let mut ownpath = String::from(path);
6784
if !ownpath.is_empty() && !ownpath.starts_with('/') { ownpath.insert(0, '/'); }
6885
if ownpath.ends_with('/') { ownpath.pop(); }
6986
Table {
7087
name: name.to_owned(),
7188
path: ownpath,
72-
file: match file {
73-
None => RefCell::new(Box::new(stdout())),
74-
Some(ref file) => RefCell::new(Box::new(
75-
match settings.filemode.as_ref() {
76-
"truncate" => File::create(Path::new(file)).unwrap_or_else(|err| fatalerr!("Error: failed to create output file '{}': {}", file, err)),
77-
"append" => OpenOptions::new().append(true).create(true).open(Path::new(file)).unwrap_or_else(|err| fatalerr!("Error: failed to open output file '{}': {}", file, err)),
78-
mode => fatalerr!("Error: invalid 'mode' setting in configuration file: {}", mode)
79-
}
80-
))
81-
},
89+
buf: RefCell::new(String::new()),
90+
writer_channel,
91+
writer_thread: Some(writer_thread),
8292
columns: Vec::new(),
8393
lastid: RefCell::new(String::new()),
8494
domain: Box::new(None),
@@ -87,8 +97,8 @@ impl<'a> Table<'a> {
8797
emit_starttransaction: if cardinality != Cardinality::None { settings.emit_starttransaction } else { false }
8898
}
8999
}
90-
fn write(&self, text: &str) {
91-
self.file.borrow_mut().write_all(text.as_bytes()).unwrap_or_else(|err| fatalerr!("Error: IO error encountered while writing table: {}", err));
100+
fn flush(&self) {
101+
if self.buf.borrow().len() > 0 { self.writer_channel.send(std::mem::take(&mut self.buf.borrow_mut())).unwrap(); }
92102
}
93103
fn clear_columns(&self) {
94104
for col in &self.columns {
@@ -98,8 +108,12 @@ impl<'a> Table<'a> {
98108
}
99109
impl<'a> Drop for Table<'a> {
100110
fn drop(&mut self) {
101-
if self.emit_copyfrom { self.write("\\.\n"); }
102-
if self.emit_starttransaction { self.write("COMMIT;\n"); }
111+
if self.emit_copyfrom { write!(self.buf.borrow_mut(), "\\.\n").unwrap(); }
112+
if self.emit_starttransaction { write!(self.buf.borrow_mut(), "COMMIT;\n").unwrap(); }
113+
self.flush();
114+
self.writer_channel.send(String::new()).unwrap(); // Terminates the writer thread
115+
let thread = std::mem::take(&mut self.writer_thread);
116+
thread.unwrap().join().unwrap_or_else(|_| eprintln!("Table writer thread for [{}] crashed", self.name));
103117
}
104118
}
105119

@@ -424,15 +438,15 @@ fn add_table<'a>(name: &str, rowpath: &str, outfile: Option<&str>, settings: &Se
424438
}
425439
fn emit_preamble(table: &Table, settings: &Settings, fkey: Option<String>) {
426440
if settings.emit_starttransaction {
427-
table.write("START TRANSACTION;\n");
441+
write!(table.buf.borrow_mut(), "START TRANSACTION;\n").unwrap();
428442
}
429443
if settings.emit_droptable {
430-
table.write(&format!("DROP TABLE IF EXISTS {};\n", table.name));
444+
write!(table.buf.borrow_mut(), "DROP TABLE IF EXISTS {};\n", table.name).unwrap();
431445
}
432446
if settings.emit_createtable {
433447
if table.cardinality == Cardinality::ManyToMany {
434448
let fkey = fkey.as_ref().unwrap();
435-
table.write(&format!("CREATE TABLE IF NOT EXISTS {}_{} ({}, {} {});\n", fkey.split_once(' ').unwrap().0, table.name, fkey, table.name, if table.columns.is_empty() { "integer" } else { &table.columns[0].datatype }));
449+
write!(table.buf.borrow_mut(), "CREATE TABLE IF NOT EXISTS {}_{} ({}, {} {});\n", fkey.split_once(' ').unwrap().0, table.name, fkey, table.name, if table.columns.is_empty() { "integer" } else { &table.columns[0].datatype }).unwrap();
436450
}
437451
else {
438452
let mut cols = table.columns.iter().filter_map(|c| {
@@ -443,28 +457,29 @@ fn emit_preamble(table: &Table, settings: &Settings, fkey: Option<String>) {
443457
Some(spec)
444458
}).collect::<Vec<String>>().join(", ");
445459
if fkey.is_some() { cols.insert_str(0, &format!("{}, ", fkey.as_ref().unwrap())); }
446-
table.write(&format!("CREATE TABLE IF NOT EXISTS {} ({});\n", table.name, cols));
460+
write!(table.buf.borrow_mut(), "CREATE TABLE IF NOT EXISTS {} ({});\n", table.name, cols).unwrap();
447461
}
448462
}
449463
if settings.emit_truncate {
450-
table.write(&format!("TRUNCATE {};\n", table.name));
464+
write!(table.buf.borrow_mut(), "TRUNCATE {};\n", table.name).unwrap();
451465
}
452466
if settings.emit_copyfrom {
453467
if table.cardinality == Cardinality::ManyToMany {
454468
let parent = fkey.as_ref().unwrap().split_once(' ').unwrap().0;
455-
table.write(&format!("COPY {}_{} ({}, {}) FROM stdin;\n", parent, table.name, parent, table.name));
469+
write!(table.buf.borrow_mut(), "COPY {}_{} ({}, {}) FROM stdin;\n", parent, table.name, parent, table.name).unwrap();
456470
}
457471
else {
458472
let cols = table.columns.iter().filter_map(|c| {
459473
if c.hide || (c.subtable.is_some() && c.subtable.as_ref().unwrap().cardinality != Cardinality::ManyToOne) { return None; }
460474
Some(String::from(&c.name))
461475
}).collect::<Vec<String>>().join(", ");
462476
if fkey.is_some() {
463-
table.write(&format!("COPY {} ({}, {}) FROM stdin;\n", table.name, fkey.unwrap().split(' ').next().unwrap(), cols));
477+
write!(table.buf.borrow_mut(), "COPY {} ({}, {}) FROM stdin;\n", table.name, fkey.unwrap().split(' ').next().unwrap(), cols).unwrap();
464478
}
465-
else { table.write(&format!("COPY {} ({}) FROM stdin;\n", table.name, cols)); }
479+
else { write!(table.buf.borrow_mut(), "COPY {} ({}) FROM stdin;\n", table.name, cols).unwrap(); }
466480
}
467481
}
482+
table.flush();
468483
}
469484

470485
fn main() {
@@ -862,7 +877,7 @@ fn process_event(event: &Event, mut state: &mut State) -> Step {
862877
if table.cardinality != Cardinality::ManyToOne { // Write the first column value of the parent table as the first column of the subtable (for use as a foreign key)
863878
let key = state.tables.last().unwrap().lastid.borrow();
864879
if key.is_empty() && !state.settings.hush_warning { println!("Warning: subtable {} has no foreign key for parent (you may need to add a 'seri' column)", table.name); }
865-
table.write(&format!("{}\t", key));
880+
write!(table.buf.borrow_mut(), "{}\t", key).unwrap();
866881
let rowid;
867882
if let Some(domain) = table.domain.as_ref() {
868883
let mut domain = domain.borrow_mut();
@@ -875,13 +890,13 @@ fn process_event(event: &Event, mut state: &mut State) -> Step {
875890
rowid = domain.lastid;
876891
domain.map.insert(key, rowid);
877892
if table.columns.len() == 1 {
878-
domain.table.write(&format!("{}\t", rowid));
893+
write!(domain.table.buf.borrow_mut(), "{}\t", rowid).unwrap();
879894
}
880895
for i in 0..table.columns.len() {
881896
if table.columns[i].subtable.is_some() { continue; }
882897
if table.columns[i].hide { continue; }
883-
if i > 0 { domain.table.write("\t"); }
884-
if table.columns[i].value.borrow().is_empty() { domain.table.write("\\N"); }
898+
if i > 0 { write!(domain.table.buf.borrow_mut(), "\t").unwrap(); }
899+
if table.columns[i].value.borrow().is_empty() { write!(domain.table.buf.borrow_mut(), "\\N").unwrap(); }
885900
else if let Some(domain) = table.columns[i].domain.as_ref() {
886901
let mut domain = domain.borrow_mut();
887902
let id = match domain.map.get(&table.columns[i].value.borrow().to_string()) {
@@ -890,27 +905,30 @@ fn process_event(event: &Event, mut state: &mut State) -> Step {
890905
domain.lastid += 1;
891906
let id = domain.lastid;
892907
domain.map.insert(table.columns[i].value.borrow().to_string(), id);
893-
domain.table.write(&format!("{}\t{}\n", id, *table.columns[i].value.borrow()));
908+
write!(domain.table.buf.borrow_mut(), "{}\t{}\n", id, *table.columns[i].value.borrow()).unwrap();
909+
domain.table.flush();
894910
id
895911
}
896912
};
897-
domain.table.write(&format!("{}", id));
913+
write!(domain.table.buf.borrow_mut(), "{}", id).unwrap();
898914
}
899915
else {
900-
domain.table.write(&table.columns[i].value.borrow());
916+
write!(domain.table.buf.borrow_mut(), "{}", &table.columns[i].value.borrow()).unwrap();
901917
}
902918
}
903-
domain.table.write("\n");
919+
write!(domain.table.buf.borrow_mut(), "\n").unwrap();
920+
domain.table.flush();
904921
}
905922
else { rowid = *domain.map.get(&key).unwrap(); }
906923
if table.columns.len() == 1 { // Single column many-to-many subtable; needs the id from the domain map
907-
table.write(&format!("{}" , rowid));
924+
write!(table.buf.borrow_mut(), "{}" , rowid).unwrap();
908925
}
909926
else {
910927
if table.lastid.borrow().is_empty() { println!("Warning: subtable {} has no primary key to normalize on", table.name); }
911-
table.write(&format!("{}" , table.lastid.borrow())); // This is a many-to-many relation; write the two keys into the link table
928+
write!(table.buf.borrow_mut(), "{}" , table.lastid.borrow()).unwrap(); // This is a many-to-many relation; write the two keys into the link table
912929
}
913-
table.write("\n");
930+
write!(table.buf.borrow_mut(), "\n").unwrap();
931+
table.flush();
914932
table.clear_columns();
915933
state.table = state.tables.pop().unwrap();
916934
return Step::Repeat;
@@ -952,8 +970,8 @@ fn process_event(event: &Event, mut state: &mut State) -> Step {
952970
table.columns[i].value.borrow_mut().clear();
953971
continue;
954972
}
955-
if i > 0 { table.write("\t"); }
956-
if table.columns[i].value.borrow().is_empty() { table.write("\\N"); }
973+
if i > 0 { write!(table.buf.borrow_mut(), "\t").unwrap(); }
974+
if table.columns[i].value.borrow().is_empty() { write!(table.buf.borrow_mut(), "\\N").unwrap(); }
957975
else if let Some(domain) = table.columns[i].domain.as_ref() {
958976
let mut domain = domain.borrow_mut();
959977
let id = match domain.map.get(&table.columns[i].value.borrow().to_string()) {
@@ -962,19 +980,21 @@ fn process_event(event: &Event, mut state: &mut State) -> Step {
962980
domain.lastid += 1;
963981
let id = domain.lastid;
964982
domain.map.insert(table.columns[i].value.borrow().to_string(), id);
965-
domain.table.write(&format!("{}\t{}\n", id, *table.columns[i].value.borrow()));
983+
write!(domain.table.buf.borrow_mut(), "{}\t{}\n", id, *table.columns[i].value.borrow()).unwrap();
984+
domain.table.flush();
966985
id
967986
}
968987
};
969-
table.write(&format!("{}", id));
988+
write!(table.buf.borrow_mut(), "{}", id).unwrap();
970989
table.columns[i].value.borrow_mut().clear();
971990
}
972991
else {
973-
table.write(&table.columns[i].value.borrow());
992+
write!(table.buf.borrow_mut(), "{}", &table.columns[i].value.borrow()).unwrap();
974993
table.columns[i].value.borrow_mut().clear();
975994
}
976995
}
977-
table.write("\n");
996+
write!(table.buf.borrow_mut(), "\n").unwrap();
997+
table.flush();
978998
}
979999
if !state.tables.is_empty() {
9801000
state.table = state.tables.pop().unwrap();
@@ -1046,3 +1066,10 @@ fn allow_iteration(column: &Column, settings: &Settings) -> bool {
10461066
_ => true
10471067
}
10481068
}
1069+
1070+
fn write_output(file: RefCell<Box<dyn Write>>, rx: mpsc::Receiver<String>) {
1071+
while let Ok(buf) = rx.recv() {
1072+
if buf.len() == 0 { break; }
1073+
file.borrow_mut().write_all(buf.as_bytes()).unwrap_or_else(|err| fatalerr!("Error: IO error encountered while writing table: {}", err))
1074+
}
1075+
}

0 commit comments

Comments
 (0)