diff --git a/.gitignore b/.gitignore index b6e4761..baff6b4 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,6 @@ dmypy.json # Pyre type checker .pyre/ + +.idea +senone_classifier/exp/ \ No newline at end of file diff --git a/run.bbn.sh b/run.bbn.sh index 3d42827..59ca406 100755 --- a/run.bbn.sh +++ b/run.bbn.sh @@ -87,7 +87,7 @@ if [ ${stage} -le 2 ]; then # utils/combine_data.sh --extra-files utt2num_frames $data_dir/${valid_set} $data_dir/dev_clean $data_dir/dev_other utils/combine_data.sh --extra-files utt2num_frames $data_dir/${valid_set} $data_dir/dev_clean - # compute global CMVN + # compute global CMVN - "potential issue here - mean variance normalization" compute-cmvn-stats scp:$data_dir/${train_set}/feats.scp $data_dir/${train_set}/cmvn.ark # dump features for training @@ -118,8 +118,8 @@ lmdatadir=$data_dir/lm_text if [ ${stage} -le 3 ]; then echo "Stage 3: Dictionary Preparation and Text Tokenization" mkdir -p $data_dir/lang - cut -f 2- -d" " $data_dir/${train_set}/text > $data_dir/lang/input - echo "$0: training sentencepiece model..." + cut -f 2- -d" " $data_dir/${train_set}/text > $data_dir/lang/input # Option 1: character level, Option 2: share data, create mixed model, Option 3: create 3 sentencepiece models + echo "$0: training sentencepiece model..." # works on text side, takes transcripts and breaks into segments. segments are the outermost layer of model python3 ../../scripts/spm_train.py --bos_id=-1 --pad_id=0 --eos_id=1 --unk_id=2 --input=$data_dir/lang/input \ --vocab_size=$((sentencepiece_vocabsize+3)) --character_coverage=1.0 \ --model_type=$sentencepiece_type --model_prefix=$sentencepiece_model \ @@ -145,7 +145,7 @@ if [ ${stage} -le 3 ]; then token_text=$data_dir/$dataset/token_text cut -f 2- -d" " $token_text > $lmdatadir/$dataset.tokens done - if [ ! -e $lmdatadir/librispeech-lm-norm.txt.gz ]; then + if [ ! -e $lmdatadir/librispeech-lm-norm.txt.gz ]; then # considered public, AMI is private data wget http://www.openslr.org/resources/11/librispeech-lm-norm.txt.gz -P $lmdatadir fi echo "$0: preparing extra corpus for subword LM training..." @@ -159,7 +159,7 @@ lmdict=$dict if $lm_shallow_fusion; then if [ ${stage} -le 4 ]; then echo "Stage 4: Text Binarization for subword LM Training" - mkdir -p $lmdatadir/log + mkdir -p $lmdatadir/log # unknown for dataset in $test_set; do test_paths="$test_paths $lmdatadir/$dataset.tokens"; done test_paths=$(echo $test_paths | awk '{$1=$1;print}' | tr ' ' ',') ${decode_cmd} $lmdatadir/log/preprocess.log \ diff --git a/senone_classifier/model.py b/senone_classifier/model.py index dc713b5..d20a3b6 100644 --- a/senone_classifier/model.py +++ b/senone_classifier/model.py @@ -1,6 +1,7 @@ import torch import torch.nn.functional as F + class FcNet(torch.nn.Module): def __init__(self, D_in, H, D_out, layers): """ @@ -24,12 +25,16 @@ def forward(self, x, y): well as arbitrary operators on Tensors. """ lstm_out, _ = self.lstm(x.view(x.shape[1], 1, -1)) - y_pred = F.log_softmax(self.linear1(lstm_out.view(x.shape[1], -1)), dim=1) + lstm_out = lstm_out.view(lstm_out.shape[1], lstm_out.shape[0], -1) + + # swap batch axis to front + y_pred = F.log_softmax(self.linear1(lstm_out), dim=2) y = torch.squeeze(y, 0) - + loss = torch.nn.NLLLoss(reduction='sum') y = y.long() - output = loss(y_pred, y) + # always have a batch size of one + output = loss(y_pred[0], y) return output def recognize(self, x): diff --git a/senone_classifier/run_training.sh b/senone_classifier/run_training.sh index 19ba549..b3732d9 100755 --- a/senone_classifier/run_training.sh +++ b/senone_classifier/run_training.sh @@ -1,12 +1,25 @@ #!/bin/bash -train_scp_dir="/nfs/mercury-13/u123/dbagchi/ASR_utt" -train_scp_file="train_full_nodup_tr90/feats.scp" -train_label_scp_file="train_full_nodup_tr90/phone.ctm2.scp" - -cv_scp_dir="/nfs/mercury-13/u123/dbagchi/ASR_utt" -cv_scp_file="train_full_nodup_cv10/feats.scp" -cv_label_scp_file="train_full_nodup_cv10/phone.ctm2.scp" +azure=1 + +if [ $azure -eq 1 ] +then + train_scp_dir="/data/sotto-voce/senone_classifier/" + train_scp_file="train_full_nodup_tr90/feats.scp" + train_label_scp_file="train_full_nodup_tr90/phone.ctm2.scp" + + cv_scp_dir="/data/sotto-voce/senone_classifier/" + cv_scp_file="train_full_nodup_cv10/feats.scp" + cv_label_scp_file="train_full_nodup_cv10/phone.ctm2.scp" +else + train_scp_dir="/Users/michael/whitenoise/sotto-voce-corpus/senone_labels" + train_scp_file="/Users/michael/whitenoise/sotto-voce-corpus/libri_inp_data/train_full_nodup_tr90/feats.scp" + train_label_scp_file="/Users/michael/whitenoise/sotto-voce-corpus/libri_inp_data/train_full_nodup_tr90/phone.ctm2.scp" + + cv_scp_dir="/Users/michael/whitenoise/sotto-voce-corpus/senone_labels" + cv_scp_file="/Users/michael/whitenoise/sotto-voce-corpus/libri_inp_data/train_full_nodup_cv10/feats.scp" + cv_label_scp_file="/Users/michael/whitenoise/sotto-voce-corpus/libri_inp_data/train_full_nodup_cv10/phone.ctm2.scp" +fi input_dim=13 output_dim=9096 @@ -25,9 +38,9 @@ ckpt=0 cont_model=0 mdl_path="final.pth.tar" -pr_fr=1000 +pr_fr=10 -/nfs/mercury-13/u123/dbagchi/anaconda3/envs/pytorch_gpu_deblin/bin/python train.py --train_scp_dir $train_scp_dir \ +python train.py --train_scp_dir $train_scp_dir \ --train_scp_file $train_scp_file \ --train_label_scp_file $train_label_scp_file \ --cv_scp_dir $cv_scp_dir \ @@ -41,4 +54,6 @@ pr_fr=1000 --momentum $mom \ --epochs $ep \ --save_folder $save_fld \ - --checkpoint $ckpt --model_path $mdl_path --print_freq $pr_fr + --checkpoint $ckpt --model_path $mdl_path --print_freq $pr_fr \ + --step_epsilon 2.38547 \ + --num_workers 4 | tee output.log diff --git a/senone_classifier/train.py b/senone_classifier/train.py index 33cee2a..5e07a77 100644 --- a/senone_classifier/train.py +++ b/senone_classifier/train.py @@ -1,14 +1,103 @@ import argparse +import resource +import traceback +from multiprocessing import Event +from multiprocessing.context import Process +from queue import Queue +from torch.nn.parallel import DistributedDataParallel as DDP +from torch import distributed as dist + from data_io_utt import * -import torch.utils.data as data from trainer import * -import resource rlimit = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (2048, rlimit[1])) -if __name__ == "__main__": +def run_senone_worker(args, rank=None, size=None, step_limit=None, federation_scheme='shuffle', queue=None, + end_event=None): + tr_file_details = {'scp_dir': args.train_scp_dir, 'scp_file': args.train_scp_file_name, + 'label_scp_file': args.train_label_scp_file} + tr_dataset = SenoneClassification(tr_file_details) + tr_dataloader = data.DataLoader(tr_dataset, batch_size=1, shuffle=True, num_workers=2) + + cv_file_details = {'scp_dir': args.cv_scp_dir, 'scp_file': args.cv_scp_file_name, + 'label_scp_file': args.cv_label_scp_file} + cv_dataset = SenoneClassification(cv_file_details) + cv_dataloader = data.DataLoader(cv_dataset, batch_size=1, shuffle=False, num_workers=2) + + dataloader = {"tr_loader": tr_dataloader, "cv_loader": cv_dataloader} + + model = FcNet( + args.input_dim, # input + args.fc_nodes, # hidden + args.output_dim, # output + args.hidden_layers) + model.apply(weights_init) + + print(model) + + if rank is not None: + print('federating on rank', rank) + model = DDP(model) + args.federation = { + 'rank': rank, + 'size': size, + 'federation_scheme': federation_scheme, + 'step_limit': step_limit, + 'end_event': end_event + } + model.cuda() + optimizer = torch.optim.Adam(model.parameters(), + lr=args.learn_rate) + + trainer = Trainer(dataloader, model, optimizer, args) + trainer.train() + + +def init_process(rank, size, fn, kwargs, backend='gloo'): + """ + Initialize the distributed environment. + """ + # use this command to kill processes: + # lsof -t -i tcp:29500 | xargs kill + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' + dist.init_process_group(backend, rank=rank, world_size=size) + + kwargs['rank'] = rank + kwargs['size'] = size + try: + fn(**kwargs) + except Exception: + if not kwargs['end_event'].is_set(): + traceback.print_exc() + kwargs['end_event'].set() + + +def train_multiprocess(worker, size, **kwargs): + processes = [] + queue = Queue() + + end_event = Event() + + for rank in range(size): + p = Process(target=init_process, args=(rank, size, worker, { + 'queue': queue, 'end_event': end_event, + **kwargs + })) + p.start() + processes.append(p) + + end_event.wait() + # wait for history to be queued + time.sleep(1) + + for p in processes: + p.terminate() + + +if __name__ == "__main__": parser = argparse.ArgumentParser() # Path params parser.add_argument("--base_dir", default=os.getcwd()) @@ -37,6 +126,10 @@ parser.add_argument('--early_stop', dest='early_stop', default=0, type=int, help='Early stop training when halving lr but still get' 'small improvement') + parser.add_argument('--step_epsilon', default=None, type=float, + help='Set step_epsilon to enable differentially private learning') + parser.add_argument('--num_workers', default=1, type=int, + help='Federate learning if > 1') # save and load model parser.add_argument('--save_folder', default='exp/temp', @@ -53,23 +146,11 @@ args = parser.parse_args() - tr_file_details = {'scp_dir':args.train_scp_dir,'scp_file':args.train_scp_file_name,'label_scp_file':args.train_label_scp_file} - tr_dataset = SenoneClassification(tr_file_details) - tr_dataloader = data.DataLoader(tr_dataset, batch_size=1, shuffle=True, num_workers=50) - - cv_file_details = {'scp_dir':args.cv_scp_dir,'scp_file':args.cv_scp_file_name,'label_scp_file':args.cv_label_scp_file} - cv_dataset = SenoneClassification(cv_file_details) - cv_dataloader = data.DataLoader(cv_dataset, batch_size=1, shuffle=False, num_workers=50) - - dataloader = {"tr_loader":tr_dataloader, "cv_loader":cv_dataloader} - - model = FcNet(args.input_dim, args.fc_nodes, args.output_dim, args.hidden_layers) # input, hidden, output - model.apply(weights_init) - - print(model) - model.cuda() - optimizer = torch.optim.Adam(model.parameters(), - lr=args.learn_rate) - - trainer = Trainer(dataloader, model, optimizer, args) - trainer.train() + if args.num_workers > 1: + train_multiprocess( + worker=run_senone_worker, + size=args.num_workers, + args=args, + federation_scheme='shuffle') + else: + run_senone_worker(args) diff --git a/senone_classifier/trainer.py b/senone_classifier/trainer.py index 485a340..e7e7188 100644 --- a/senone_classifier/trainer.py +++ b/senone_classifier/trainer.py @@ -1,6 +1,13 @@ import os import time import torch +try: + from opendp.network.odometer_reconstruction import ReconstructionPrivacyOdometer + from opendp.network.odometer_stochastic import StochasticPrivacyOdometer + from opendp.network.odometer_manual import ManualPrivacyOdometer +except ImportError as e: + print("Install opendp from the external-sgd branch here: https://github.com/opendp/opendp/tree/external-sgd") + raise e class Trainer(object): @@ -10,6 +17,21 @@ def __init__(self, dataloader, model, optimizer, args): self.tr_loader = dataloader['tr_loader'] self.cv_loader = dataloader['cv_loader'] + self.accountant = None + if args.step_epsilon: + # full grad reconstruction odometer + # self.odometer = ReconstructionPrivacyOdometer(step_epsilon=args.step_epsilon) + # self.model = self.odometer.make_tracked_view(self.model) + + # # stochastic odometer + self.odometer = StochasticPrivacyOdometer(step_epsilon=args.step_epsilon) + self.odometer.track_(self.model) + + # # manual odometer + # self.odometer = ManualPrivacyOdometer(model=self.model, step_epsilon=args.step_epsilon) + + self.federation = args.federation if hasattr(args, 'federation') else {} + # Training config self.epochs = args.epochs self.half_lr = args.half_lr @@ -68,7 +90,11 @@ def train(self): print('Saving checkpoint model to %s' % file_path) print("Cross validation...") + val_loss = self._run_one_epoch(epoch, cross_valid=True) + if self.odometer: + self.odometer.increment_epoch() + print('-' * 85) print('Valid Summary | End of Epoch {0} | Time {1:.2f}s | ' 'Valid Loss {2:.3f}'.format( @@ -104,11 +130,6 @@ def train(self): file_path) print("Find better validated model, saving to %s" % file_path) - - - - - def _run_one_epoch(self, epoch, cross_valid=False): start = time.time() total_loss = 0 @@ -123,11 +144,14 @@ def _run_one_epoch(self, epoch, cross_valid=False): if not cross_valid: self.optimizer.zero_grad() loss.backward() + + # enable only for ManualPrivacyOdometer + # self.odometer.privatize_grad() self.optimizer.step() total_loss += loss.item() - if i % self.print_freq == 0: - print ('Epoch {0} | Iter {1} | Average Loss {2:.3f} |' - 'Current Loss {3:.6f} | {4:.1f} ms/batch'.format( + if (self.federation.get('rank') == 0 or not self.federation) and i % self.print_freq == 0: + print('Epoch {0} | Iter {1} | Average Loss {2:.3f} |' + ' Current Loss {3:.6f} | {4:.1f} ms/batch'.format( epoch + 1, i + 1, total_loss / (i + 1), loss.item(), 1000 * (time.time() - start) / (i + 1)), flush=True)