32
32
import java .util .Map ;
33
33
import java .util .Optional ;
34
34
import java .util .concurrent .ExecutionException ;
35
- import java .util .concurrent .ExecutorService ;
36
- import java .util .concurrent .Executors ;
37
35
import java .util .concurrent .Future ;
36
+ import java .util .concurrent .LinkedBlockingQueue ;
37
+ import java .util .concurrent .ThreadPoolExecutor ;
38
38
import java .util .concurrent .TimeUnit ;
39
39
import java .util .concurrent .TimeoutException ;
40
40
import org .apache .commons .lang3 .StringUtils ;
@@ -84,9 +84,13 @@ public class HadoopCatalogOperations extends ManagedSchemaOperations
84
84
implements CatalogOperations , FilesetCatalog {
85
85
private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist" ;
86
86
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist" ;
87
- private static final ExecutorService FILE_SYSTEM_EXECUTOR =
88
- Executors .newFixedThreadPool (
87
+ private static final ThreadPoolExecutor FILE_SYSTEM_EXECUTOR =
88
+ new ThreadPoolExecutor (
89
+ 2 ,
89
90
Math .max (2 , Runtime .getRuntime ().availableProcessors () / 2 ),
91
+ 60L ,
92
+ TimeUnit .SECONDS ,
93
+ new LinkedBlockingQueue <>(100 ),
90
94
r -> {
91
95
Thread thread = new Thread (r , "FileSystem-Get-Thread" );
92
96
thread .setDaemon (true );
@@ -110,6 +114,10 @@ public class HadoopCatalogOperations extends ManagedSchemaOperations
110
114
111
115
private FileSystemProvider defaultFileSystemProvider ;
112
116
117
+ static {
118
+ FILE_SYSTEM_EXECUTOR .allowCoreThreadTimeOut (true );
119
+ }
120
+
113
121
HadoopCatalogOperations (EntityStore store ) {
114
122
this .store = store ;
115
123
}
0 commit comments