Skip to content

Commit 9aefa1b

Browse files
authored
Merge pull request #57 from WeDataSphere/dev-0.2.3-bugfix
Fix the problem of transformer component in Streamis
2 parents ead2079 + 1615445 commit 9aefa1b

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkJarStreamisJobContentTransform.scala

+17-12
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.webank.wedatasphere.streamis.jobmanager.manager.utils.JobUtils
2727

2828
import scala.collection.JavaConverters._
2929
import scala.collection.mutable
30+
import scala.collection.mutable.ArrayBuffer
3031

3132
/**
3233
* Created by enjoyyin on 2021/9/23.
@@ -50,20 +51,24 @@ class FlinkJarStreamisStartupParamsTransform extends Transform {
5051
startupMap.put("flink.app.main.class.jar", transformJobContent.getMainClassJar.getFileName)
5152
startupMap.put("flink.app.main.class.jar.bml.json",
5253
JsonUtils.jackson.writeValueAsString(getStreamisFileContent(transformJobContent.getMainClassJar)))
53-
val classpathFiles = if(transformJobContent.getDependencyJars != null && transformJobContent.getResources != null) {
54-
startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(","))
55-
transformJobContent.getDependencyJars.asScala ++ transformJobContent.getResources.asScala
56-
} else if(transformJobContent.getDependencyJars != null) {
57-
startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(","))
58-
transformJobContent.getDependencyJars.asScala
59-
} else if(transformJobContent.getResources != null) {
60-
startupMap.put("flink.yarn.ship-directories", transformJobContent.getResources.asScala.map(_.getFileName).mkString(","))
61-
transformJobContent.getResources.asScala
54+
55+
/**
56+
* Notice : "flink.app.user.class.path" equals to PipelineOptions.CLASSPATHS in Flink
57+
* paths must specify a protocol (e.g. file://) and be accessible on all nodes
58+
* so we use "flink.yarn.ship-directories" instead
59+
*/
60+
var classPathFiles = Option(transformJobContent.getDependencyJars) match {
61+
case Some(list) => list.asScala
62+
case _ => mutable.Buffer[StreamisFile]()
63+
}
64+
Option(transformJobContent.getResources) match {
65+
case Some(list) => classPathFiles = classPathFiles ++ list.asScala
66+
case _ => // Do nothing
6267
}
63-
else mutable.Buffer[StreamisFile]()
64-
if(classpathFiles.nonEmpty)
68+
startupMap.put("flink.yarn.ship-directories", classPathFiles.map(_.getFileName).mkString(","))
69+
if(classPathFiles.nonEmpty)
6570
startupMap.put("flink.app.user.class.path.bml.json",
66-
JsonUtils.jackson.writeValueAsString(classpathFiles.map(getStreamisFileContent).asJava))
71+
JsonUtils.jackson.writeValueAsString(classPathFiles.map(getStreamisFileContent).asJava))
6772
if(transformJobContent.getHdfsJars != null)
6873
startupMap.put("flink.user.lib.path", transformJobContent.getHdfsJars.asScala.mkString(","))
6974
val params = if(job.getParams == null) new util.HashMap[String, Any] else job.getParams

0 commit comments

Comments
 (0)