11package com .webank .wedatasphere .dss .orchestrator .server .job ;
22
33import com .google .common .collect .Lists ;
4+ import com .webank .wedatasphere .dss .appconn .scheduler .structure .orchestration .OrchestrationCreationOperation ;
5+ import com .webank .wedatasphere .dss .appconn .scheduler .structure .orchestration .OrchestrationService ;
6+ import com .webank .wedatasphere .dss .appconn .scheduler .structure .orchestration .ref .DSSOrchestrationContentRequestRef ;
7+ import com .webank .wedatasphere .dss .appconn .scheduler .structure .orchestration .ref .OrchestrationResponseRef ;
48import com .webank .wedatasphere .dss .common .exception .DSSErrorException ;
59import com .webank .wedatasphere .dss .common .label .DSSLabel ;
610import com .webank .wedatasphere .dss .common .utils .MapUtils ;
7- import com .webank .wedatasphere .dss .orchestrator .common .entity .DSSOrchestratorCopyInfo ;
8- import com .webank .wedatasphere .dss .orchestrator .common .entity .DSSOrchestratorInfo ;
9- import com .webank .wedatasphere .dss .orchestrator .common .entity .DSSOrchestratorVersion ;
11+ import com .webank .wedatasphere .dss .orchestrator .common .entity .*;
1012import com .webank .wedatasphere .dss .orchestrator .common .ref .OrchestratorRefConstant ;
1113import com .webank .wedatasphere .dss .orchestrator .core .DSSOrchestrator ;
1214import com .webank .wedatasphere .dss .orchestrator .core .utils .OrchestratorUtils ;
15+ import com .webank .wedatasphere .dss .orchestrator .db .dao .OrchestratorMapper ;
1316import com .webank .wedatasphere .dss .orchestrator .publish .utils .OrchestrationDevelopmentOperationUtils ;
1417import com .webank .wedatasphere .dss .orchestrator .server .entity .vo .OrchestratorCopyVo ;
18+ import com .webank .wedatasphere .dss .orchestrator .server .service .OrchestratorService ;
19+ import com .webank .wedatasphere .dss .orchestrator .server .service .impl .OrchestratorFrameworkServiceImpl ;
1520import com .webank .wedatasphere .dss .standard .app .development .operation .RefCopyOperation ;
1621import com .webank .wedatasphere .dss .standard .app .development .ref .CopyRequestRef ;
1722import com .webank .wedatasphere .dss .standard .app .development .ref .RefJobContentResponseRef ;
@@ -33,6 +38,12 @@ public class OrchestratorCopyJob implements Runnable {
3338
3439 protected OrchestratorCopyEnv orchestratorCopyEnv ;
3540
41+ private OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl ;
42+
43+ private OrchestratorService orchestratorService ;
44+
45+ private OrchestratorMapper orchestratorMapper ;
46+
3647 private DSSOrchestratorCopyInfo orchestratorCopyInfo = new DSSOrchestratorCopyInfo (UUID .randomUUID ().toString ());
3748
3849
@@ -66,9 +77,10 @@ private void copyOrchestrator() {
6677 newOrchestrator .setDesc ("copy from " + sourceOrchestrator .getName ());
6778 newOrchestrator .setUpdateTime (null );
6879 newOrchestrator .setUpdateUser (null );
80+ DSSOrchestratorVersion dssOrchestratorVersion = null ;
6981
7082 try {
71- doOrchestratorCopy (orchestratorCopyVo .getUsername (), orchestratorCopyVo .getWorkspace (), newOrchestrator ,
83+ dssOrchestratorVersion = doOrchestratorCopy (orchestratorCopyVo .getUsername (), orchestratorCopyVo .getWorkspace (), newOrchestrator ,
7284 orchestratorCopyVo .getTargetProjectName (), Lists .newArrayList (orchestratorCopyVo .getDssLabel ()), appId );
7385 } catch (Exception e ) {
7486 //保存错误信息
@@ -91,9 +103,37 @@ private void copyOrchestrator() {
91103 orchestratorCopyInfo .setSuccessNode (Lists .newArrayList ("All" ));
92104 orchestratorCopyInfo .setStatus (1 );
93105 orchestratorCopyEnv .getOrchestratorCopyJobMapper ().updateCopyStatus (orchestratorCopyInfo );
106+
107+ List <DSSLabel > dssLabels = new ArrayList <>();
108+ dssLabels .add (orchestratorCopyVo .getDssLabel ());
109+
110+ //2.如果调度系统要求同步创建工作流,向调度系统发送创建工作流的请求
111+ OrchestrationResponseRef orchestrationResponseRef = orchestratorFrameworkServiceImpl .tryOrchestrationOperation (dssLabels , true , orchestratorCopyVo .getUsername (),
112+ orchestratorCopyVo .getTargetProjectName (), orchestratorCopyVo .getWorkspace (), newOrchestrator ,
113+ OrchestrationService ::getOrchestrationCreationOperation ,
114+ (structureOperation , structureRequestRef ) -> ((OrchestrationCreationOperation ) structureOperation )
115+ .createOrchestration ((DSSOrchestrationContentRequestRef ) structureRequestRef ), "create" );
116+
117+ try {
118+ orchestratorService .copyOrchestrator (orchestratorCopyVo .getUsername (), orchestratorCopyVo .getWorkspace (), orchestratorCopyVo .getTargetProjectName (),
119+ orchestratorCopyVo .getTargetProjectId (), newOrchestrator .getDesc (), newOrchestrator , dssLabels );
120+ } catch (Exception e ) {
121+ throw new RuntimeException ("error happened when copying orc." , e );
122+ }
123+
124+ Long orchestratorId = newOrchestrator .getId ();
125+ Long orchestratorVersionId = dssOrchestratorVersion .getId ();
126+ //4.将工程和orchestrator的关系存储到的数据库中
127+ if (orchestrationResponseRef != null ) {
128+ Long refProjectId = (Long ) orchestrationResponseRef .toMap ().get ("refProjectId" );
129+ orchestratorMapper .addOrchestratorRefOrchestration (new DSSOrchestratorRefOrchestration (orchestratorId , refProjectId , orchestrationResponseRef .getRefOrchestrationId ()));
130+ } else {
131+ LOGGER .info ("copy orchestration {} with orchestratorId is {}, and versionId is {}, and orchestrationResponseRef is null." , newOrchestrator .getName (), orchestratorId , orchestratorVersionId );
132+
133+ }
94134 }
95135
96- private void doOrchestratorCopy (String userName ,
136+ private DSSOrchestratorVersion doOrchestratorCopy (String userName ,
97137 Workspace workspace ,
98138 DSSOrchestratorInfo dssOrchestratorInfo ,
99139 String projectName ,
@@ -132,6 +172,7 @@ private void doOrchestratorCopy(String userName,
132172 orchestratorCopyEnv .getOrchestratorMapper ().addOrchestrator (dssOrchestratorInfo );
133173 dssOrchestratorVersion .setOrchestratorId (dssOrchestratorInfo .getId ());
134174 orchestratorCopyEnv .getOrchestratorMapper ().addOrchestratorVersion (dssOrchestratorVersion );
175+ return dssOrchestratorVersion ;
135176 }
136177
137178
@@ -158,4 +199,28 @@ public DSSOrchestratorCopyInfo getOrchestratorCopyInfo() {
158199 public void setOrchestratorCopyInfo (DSSOrchestratorCopyInfo orchestratorCopyInfo ) {
159200 this .orchestratorCopyInfo = orchestratorCopyInfo ;
160201 }
202+
203+ public OrchestratorFrameworkServiceImpl getOrchestratorFrameworkServiceImpl () {
204+ return orchestratorFrameworkServiceImpl ;
205+ }
206+
207+ public void setOrchestratorFrameworkServiceImpl (OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl ) {
208+ this .orchestratorFrameworkServiceImpl = orchestratorFrameworkServiceImpl ;
209+ }
210+
211+ public OrchestratorService getOrchestratorService () {
212+ return orchestratorService ;
213+ }
214+
215+ public void setOrchestratorService (OrchestratorService orchestratorService ) {
216+ this .orchestratorService = orchestratorService ;
217+ }
218+
219+ public OrchestratorMapper getOrchestratorMapper () {
220+ return orchestratorMapper ;
221+ }
222+
223+ public void setOrchestratorMapper (OrchestratorMapper orchestratorMapper ) {
224+ this .orchestratorMapper = orchestratorMapper ;
225+ }
161226}
0 commit comments