1
1
# -*- coding: utf-8 -*-
2
-
3
2
"""
4
3
Implementation for the ``Issho`` class, which implements
5
4
a connection and some simple commands over ``ssh``, using
6
5
``keyring`` to manage secrets locally.
7
6
"""
8
-
9
- import paramiko
10
- import keyring
11
- from sshtunnel import SSHTunnelForwarder
12
- from issho .helpers import (
13
- default_sftp_path ,
14
- get_pkey ,
15
- issho_pw_name ,
16
- get_user ,
17
- clean_spark_options ,
18
- )
19
- from issho .config import read_issho_conf , read_ssh_profile
20
7
import sys
21
8
import time
22
9
from shutil import copyfile
10
+
23
11
import humanize
12
+ import keyring
13
+ import paramiko
14
+ from sshtunnel import SSHTunnelForwarder
15
+
16
+ from issho .config import read_issho_conf
17
+ from issho .config import read_ssh_profile
18
+ from issho .helpers import clean_spark_options
19
+ from issho .helpers import default_sftp_path
20
+ from issho .helpers import get_pkey
21
+ from issho .helpers import get_user
22
+ from issho .helpers import issho_pw_name
24
23
25
24
26
25
class Issho :
@@ -198,9 +197,10 @@ def spark_submit(
198
197
jars = "" ,
199
198
files = "" ,
200
199
driver_class_path = "" ,
201
- app_class = "" ,
200
+ application_class = "" ,
202
201
application = "" ,
203
202
application_args = "" ,
203
+ bg = False ,
204
204
):
205
205
"""
206
206
Submit a spark job.
@@ -210,35 +210,45 @@ def spark_submit(
210
210
:param jars: syntactic sugar for the --jars spark option
211
211
:param files: syntactic sugar for the --files spark option
212
212
:param driver_class_path: syntactic sugar for the --driver-class-path spark option
213
- :param app_class : syntactic sugar for the --class spark option
213
+ :param application_class : syntactic sugar for the --class spark option
214
214
:param application: the application to submit
215
215
:param application_args: any arguments to be passed to the spark application
216
+ :param bg: True to run in the background, False otherwise
216
217
:return:
217
218
"""
218
219
assert application
219
220
if not spark_options :
220
221
spark_options = {}
221
222
for k , v in locals ().items ():
222
- if k in {"spark_options" , "application" , "application_args" }:
223
+ if k in {"spark_options" , "application" , "application_args" , "self" , "bg" }:
223
224
continue
225
+ clean_keys = {"application_class" : "class" }
226
+ clean_k = clean_keys .get (k , k )
224
227
if v :
225
- spark_options [k ] = v
228
+ spark_options [clean_k ] = v
226
229
227
230
cleaned_spark_options = clean_spark_options (spark_options )
228
231
spark_options_str = " " .join (
229
- map (lambda k , v : "{} {}" .format (k , v ), cleaned_spark_options .values ())
232
+ (
233
+ "{} {}" .format (k , v )
234
+ for k , v in sorted (cleaned_spark_options .items (), key = lambda x : x [0 ])
235
+ )
230
236
)
231
237
spark_cmd = "spark-submit {} {} {}" .format (
232
238
spark_options_str , application , application_args
233
239
)
234
- self .exec (spark_cmd )
240
+ print (spark_cmd )
241
+ self .exec (spark_cmd , bg = bg )
235
242
236
243
def spark (self , * args , ** kwargs ):
237
244
"""
238
245
Syntactic sugar for spark_submit
239
246
"""
240
247
self .spark_submit (* args , ** kwargs )
241
248
249
+ def spark_bg (self , * args , ** kwargs ):
250
+ self .spark_submit (bg = True , * args , ** kwargs )
251
+
242
252
def _connect (self ):
243
253
"""
244
254
Uses paramiko to connect to the remote specified
0 commit comments