14
14
15
15
package org .apache .zeppelin .hbase ;
16
16
17
- import org .jruby .embed .LocalContextScope ;
18
- import org .jruby .embed .ScriptingContainer ;
19
- import org .slf4j .Logger ;
20
- import org .slf4j .LoggerFactory ;
21
-
17
+ import java .io .ByteArrayOutputStream ;
22
18
import java .io .File ;
23
- import java .io .FileInputStream ;
24
19
import java .io .IOException ;
25
- import java .io . StringWriter ;
20
+ import java .nio . file . Files ;
26
21
import java .nio .file .Path ;
27
22
import java .nio .file .Paths ;
28
- import java .util .List ;
23
+ import java .util .HashMap ;
24
+ import java .util .Map ;
29
25
import java .util .Properties ;
30
26
27
+ import org .apache .commons .exec .CommandLine ;
28
+ import org .apache .commons .exec .DefaultExecutor ;
29
+ import org .apache .commons .exec .ExecuteException ;
30
+ import org .apache .commons .exec .ExecuteWatchdog ;
31
+ import org .apache .commons .exec .Executor ;
32
+ import org .apache .commons .exec .PumpStreamHandler ;
33
+ import org .apache .commons .io .FileUtils ;
34
+ import org .apache .commons .lang3 .StringUtils ;
31
35
import org .apache .zeppelin .interpreter .Interpreter ;
32
36
import org .apache .zeppelin .interpreter .InterpreterContext ;
33
37
import org .apache .zeppelin .interpreter .InterpreterException ;
34
38
import org .apache .zeppelin .interpreter .InterpreterResult ;
35
- import org .apache .zeppelin .interpreter .thrift .InterpreterCompletion ;
36
39
import org .apache .zeppelin .scheduler .Scheduler ;
37
40
import org .apache .zeppelin .scheduler .SchedulerFactory ;
41
+ import org .slf4j .Logger ;
42
+ import org .slf4j .LoggerFactory ;
38
43
39
44
/**
40
- * Support for HBase Shell. All the commands documented here
41
- * http://hbase.apache.org/book.html#shell is supported.
42
- *
43
- * Requirements:
44
- * HBase Shell should be installed on the same machine. To be more specific, the following dir.
45
- * should be available: https://github.com/apache/hbase/tree/master/hbase-shell/src/main/ruby
46
- * HBase Shell should be able to connect to the HBase cluster from terminal. This makes sure
47
- * that the client is configured properly.
48
- *
49
- * The interpreter takes 3 config parameters:
50
- * hbase.home: Root directory where HBase is installed. Default is /usr/lib/hbase/
51
- * hbase.ruby.sources: Dir where shell ruby code is installed.
52
- * Path is relative to hbase.home. Default: lib/ruby
53
- * zeppelin.hbase.test.mode: (Testing only) Disable checks for unit and manual tests. Default: false
45
+ * HBase interpreter. It uses the hbase shell to interpret the commands.
54
46
*/
55
47
public class HbaseInterpreter extends Interpreter {
48
+ private static final Logger LOGGER = LoggerFactory .getLogger (HbaseInterpreter .class );
49
+
56
50
public static final String HBASE_HOME = "hbase.home" ;
57
- public static final String HBASE_RUBY_SRC = "hbase.ruby.sources" ;
58
- public static final String HBASE_TEST_MODE = "zeppelin.hbase.test.mode" ;
59
51
60
- private static final Logger LOGGER = LoggerFactory . getLogger ( HbaseInterpreter . class );
61
- private ScriptingContainer scriptingContainer ;
52
+ private static final Path TEMP_FOLDER = Paths . get ( System . getProperty ( "java.io.tmpdir" ),
53
+ "zeppelin-hbase-scripts" ) ;
62
54
63
- private StringWriter writer ;
55
+ private Map < String , Executor > runningProcesses = new HashMap <>() ;
64
56
65
- public HbaseInterpreter (Properties property ) {
66
- super (property );
57
+ private Map <String , File > tempFiles = new HashMap <>();
58
+
59
+ private static final int SIGTERM_CODE = 143 ;
60
+
61
+ private long commandTimeout = 60000 ;
62
+
63
+ public HbaseInterpreter (Properties properties ) {
64
+ super (properties );
67
65
}
68
66
69
67
@ Override
70
68
public void open () throws InterpreterException {
71
- this .scriptingContainer = new ScriptingContainer (LocalContextScope .SINGLETON );
72
- this .writer = new StringWriter ();
73
- scriptingContainer .setOutput (this .writer );
74
-
75
- if (!Boolean .parseBoolean (getProperty (HBASE_TEST_MODE ))) {
76
- String hbaseHome = getProperty (HBASE_HOME );
77
- String rubySrc = getProperty (HBASE_RUBY_SRC );
78
- Path absRubySrc = Paths .get (hbaseHome , rubySrc ).toAbsolutePath ();
79
-
80
- LOGGER .info ("Home:" + hbaseHome );
81
- LOGGER .info ("Ruby Src:" + rubySrc );
82
-
83
- File f = absRubySrc .toFile ();
84
- if (!f .exists () || !f .isDirectory ()) {
85
- throw new InterpreterException ("HBase ruby sources is not available at '" + absRubySrc
86
- + "'" );
87
- }
88
-
89
- LOGGER .info ("Absolute Ruby Source:" + absRubySrc .toString ());
90
- // hirb.rb:41 requires the following system properties to be set.
91
- Properties sysProps = System .getProperties ();
92
- sysProps .setProperty (HBASE_RUBY_SRC , absRubySrc .toString ());
93
-
94
- Path absHirbPath = Paths .get (hbaseHome , "bin/hirb.rb" );
95
- try {
96
- FileInputStream fis = new FileInputStream (absHirbPath .toFile ());
97
- this .scriptingContainer .runScriptlet (fis , "hirb.rb" );
98
- fis .close ();
99
- } catch (IOException e ) {
100
- throw new InterpreterException (e .getCause ());
101
- }
102
- }
69
+ // Do nothing
103
70
}
104
71
105
72
@ Override
106
73
public void close () {
107
- if (this .scriptingContainer != null ) {
108
- this .scriptingContainer .terminate ();
109
- }
74
+ runningProcesses .clear ();
75
+ runningProcesses = null ;
76
+ tempFiles .clear ();
77
+ tempFiles = null ;
110
78
}
111
79
112
80
@ Override
113
- public InterpreterResult interpret (String cmd , InterpreterContext interpreterContext ) {
81
+ public InterpreterResult interpret (String st , InterpreterContext context ) {
82
+ LOGGER .debug ("Run HBase shell script: {}" , st );
83
+
84
+ if (StringUtils .isEmpty (st )) {
85
+ return new InterpreterResult (InterpreterResult .Code .SUCCESS );
86
+ }
87
+
88
+ String paragraphId = context .getParagraphId ();
89
+ final File scriptFile ;
90
+ try {
91
+ // Write script in a temporary file
92
+ // The script is enriched with extensions
93
+ scriptFile = createTempFile (paragraphId );
94
+ FileUtils .write (scriptFile , st + "\n exit" );
95
+ } catch (IOException e ) {
96
+ LOGGER .error ("Can not write script in temp file" , e );
97
+ return new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
98
+ }
99
+
100
+ InterpreterResult result = new InterpreterResult (InterpreterResult .Code .SUCCESS );
101
+
102
+ final DefaultExecutor executor = new DefaultExecutor ();
103
+ final ByteArrayOutputStream errorStream = new ByteArrayOutputStream ();
104
+
105
+ executor .setStreamHandler (new PumpStreamHandler (context .out , errorStream ));
106
+ executor .setWatchdog (new ExecuteWatchdog (commandTimeout ));
107
+
108
+ String hbaseCmdPath = Paths .get (getProperty (HBASE_HOME ), "bin" , "hbase" ).toString ();
109
+ final CommandLine cmdLine = CommandLine .parse (hbaseCmdPath );
110
+ cmdLine .addArgument ("shell" , false );
111
+ cmdLine .addArgument (scriptFile .getAbsolutePath (), false );
112
+
114
113
try {
115
- LOGGER .info (cmd );
116
- this .writer .getBuffer ().setLength (0 );
117
- this .scriptingContainer .runScriptlet (cmd );
118
- this .writer .flush ();
119
- LOGGER .debug (writer .toString ());
120
- return new InterpreterResult (InterpreterResult .Code .SUCCESS , writer .getBuffer ().toString ());
121
- } catch (Throwable t ) {
122
- LOGGER .error ("Can not run '" + cmd + "'" , t );
123
- return new InterpreterResult (InterpreterResult .Code .ERROR , t .getMessage ());
114
+ executor .execute (cmdLine );
115
+ runningProcesses .put (paragraphId , executor );
116
+ } catch (ExecuteException e ) {
117
+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
118
+
119
+ final int exitValue = e .getExitValue ();
120
+ InterpreterResult .Code code = InterpreterResult .Code .ERROR ;
121
+ String msg = errorStream .toString ();
122
+
123
+ if (exitValue == SIGTERM_CODE ) {
124
+ code = InterpreterResult .Code .INCOMPLETE ;
125
+ msg = msg + "Paragraph received a SIGTERM.\n " ;
126
+ LOGGER .info ("The paragraph {} stopped executing: {}" , paragraphId , msg );
127
+ }
128
+
129
+ msg += "ExitValue: " + exitValue ;
130
+ result = new InterpreterResult (code , msg );
131
+ } catch (IOException e ) {
132
+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
133
+ result = new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
134
+ } finally {
135
+ deleteTempFile (paragraphId );
136
+ stopProcess (paragraphId );
124
137
}
138
+ return result ;
125
139
}
126
140
127
141
@ Override
128
- public void cancel (InterpreterContext context ) {}
142
+ public void cancel (InterpreterContext context ) {
143
+ stopProcess (context .getParagraphId ());
144
+ deleteTempFile (context .getParagraphId ());
145
+ }
129
146
130
147
@ Override
131
148
public FormType getFormType () {
@@ -143,30 +160,27 @@ public Scheduler getScheduler() {
143
160
HbaseInterpreter .class .getName () + this .hashCode ());
144
161
}
145
162
146
- @ Override
147
- public List <InterpreterCompletion > completion (String buf , int cursor ,
148
- InterpreterContext interpreterContext ) {
149
- return null ;
163
+ private void stopProcess (String paragraphId ) {
164
+ Executor executor = runningProcesses .remove (paragraphId );
165
+ if (null != executor ) {
166
+ final ExecuteWatchdog watchdog = executor .getWatchdog ();
167
+ watchdog .destroyProcess ();
168
+ }
150
169
}
151
170
152
- private static String getSystemDefault (
153
- String envName ,
154
- String propertyName ,
155
- String defaultValue ) {
156
-
157
- if (envName != null && !envName .isEmpty ()) {
158
- String envValue = System .getenv ().get (envName );
159
- if (envValue != null ) {
160
- return envValue ;
161
- }
171
+ private File createTempFile (String paragraphId ) throws IOException {
172
+ if (!Files .exists (TEMP_FOLDER )) {
173
+ Files .createDirectory (TEMP_FOLDER );
162
174
}
175
+ File temp = Files .createTempFile (TEMP_FOLDER , paragraphId , ".txt" ).toFile ();
176
+ tempFiles .put (paragraphId , temp );
177
+ return temp ;
178
+ }
163
179
164
- if (propertyName != null && !propertyName .isEmpty ()) {
165
- String propValue = System .getProperty (propertyName );
166
- if (propValue != null ) {
167
- return propValue ;
168
- }
180
+ private void deleteTempFile (String paragraphId ) {
181
+ File tmpFile = tempFiles .remove (paragraphId );
182
+ if (null != tmpFile ) {
183
+ FileUtils .deleteQuietly (tmpFile );
169
184
}
170
- return defaultValue ;
171
185
}
172
186
}
0 commit comments