33import itertools
44import multiprocessing
55import os
6- import sys
76import shutil
87import subprocess
98from threading import Timer
10- import sys
119from argparse import ArgumentParser
12- from subprocess import Popen , PIPE , STDOUT , call
13-
1410
1511def get_immediate_subdirectories (a_dir ):
1612 return [(os .path .join (a_dir , name )) for name in os .listdir (a_dir )
1713 if os .path .isdir (os .path .join (a_dir , name ))]
1814
19-
2015def ParallelExtractDir (args , tmpdir , dir_ ):
21- ExtractFeaturesForDir (args ,tmpdir , dir_ , "" )
22-
16+ ExtractFeaturesForDir (args , tmpdir , dir_ , "" )
2317
2418def ExtractFeaturesForDir (args , tmpdir , dir_ , prefix ):
2519 command = ['java' , '-cp' , args .jar , 'JavaExtractor.App' ,
2620 '--max_path_length' , str (args .max_path_length ), '--max_path_width' , str (args .max_path_width ),
2721 '--dir' , dir_ , '--num_threads' , str (args .num_threads )]
28- # print command
29- # os.system(command)
3022 kill = lambda process : process .kill ()
3123 outputFileName = tmpdir + prefix + dir_ .split ('/' )[- 1 ]
3224 failed = False
@@ -39,36 +31,35 @@ def ExtractFeaturesForDir(args, tmpdir, dir_, prefix):
3931 finally :
4032 timer .cancel ()
4133
42- if sleeper .poll () == 0 :
43- if len (stderr ) > 0 :
44- print (sys .stderr , stderr , file = sys .stdout )
45- else :
46- print (sys .stderr , 'dir: ' + str (dir_ ) + ' was not completed in time' , file = sys .stdout , flush = True )
34+ if sleeper .poll () != 0 :
4735 failed = True
4836 subdirs = get_immediate_subdirectories (dir_ )
4937 for subdir in subdirs :
5038 ExtractFeaturesForDir (args , subdir , prefix + dir_ .split ('/' )[- 1 ] + '_' )
51- if failed :
52- if os .path .exists (outputFileName ):
53- os .remove (outputFileName )
54-
39+
40+ if failed and os .path .exists (outputFileName ):
41+ os .remove (outputFileName )
5542
5643def ExtractFeaturesForDirsList (args , dirs ):
5744 tmp_dir = f"./tmp/feature_extractor{ os .getpid ()} /"
5845 if os .path .exists (tmp_dir ):
5946 shutil .rmtree (tmp_dir , ignore_errors = True )
6047 os .makedirs (tmp_dir )
61- try :
62- p = multiprocessing .Pool (4 )
63- p .starmap (ParallelExtractDir , zip (itertools .repeat (args ),itertools .repeat (tmp_dir ), dirs ))
64- #for dir in dirs:
65- # ExtractFeaturesForDir(args, dir, '')
48+
49+ for i in range (0 , len (dirs ), args .batch_size ):
50+ batch_dirs = dirs [i :i + args .batch_size ]
51+ timeout_seconds = 60 # timeout setting
52+ try :
53+ with multiprocessing .Pool (4 ) as p :
54+ result = p .starmap_async (ParallelExtractDir , zip (itertools .repeat (args ), itertools .repeat (tmp_dir ), batch_dirs ))
55+ result .get (timeout = timeout_seconds )
56+ except multiprocessing .TimeoutError :
57+ continue
58+
6659 output_files = os .listdir (tmp_dir )
6760 for f in output_files :
6861 os .system ("cat %s/%s" % (tmp_dir , f ))
69- finally :
70- shutil .rmtree (tmp_dir , ignore_errors = True )
71-
62+ os .remove (os .path .join (tmp_dir , f ))
7263
7364if __name__ == '__main__' :
7465 parser = ArgumentParser ()
@@ -78,6 +69,9 @@ def ExtractFeaturesForDirsList(args, dirs):
7869 parser .add_argument ("-j" , "--jar" , dest = "jar" , required = True )
7970 parser .add_argument ("-dir" , "--dir" , dest = "dir" , required = False )
8071 parser .add_argument ("-file" , "--file" , dest = "file" , required = False )
72+ # add a new parameter batch_size
73+ parser .add_argument ("-batch_size" , "--batch_size" , dest = "batch_size" , required = False , default = 3 , type = int )
74+
8175 args = parser .parse_args ()
8276
8377 if args .file is not None :
@@ -86,9 +80,5 @@ def ExtractFeaturesForDirsList(args, dirs):
8680 os .system (command )
8781 elif args .dir is not None :
8882 subdirs = get_immediate_subdirectories (args .dir )
89- to_extract = subdirs
90- if len (subdirs ) == 0 :
91- to_extract = [args .dir .rstrip ('/' )]
83+ to_extract = subdirs if subdirs else [args .dir .rstrip ('/' )]
9284 ExtractFeaturesForDirsList (args , to_extract )
93-
94-
0 commit comments