From a0da6448c8283f5ece8a1b10ade15dc6a78fbeea Mon Sep 17 00:00:00 2001 From: Maksym Lozbin Date: Fri, 3 May 2019 13:03:07 +0300 Subject: [PATCH 1/2] CDAP-15237 Enhance the streaming plugin to support CDC option --- docs/CDCSalesforce-streamingsource.md | 43 ++++ icons/CDCSalesforce-streamingsource.png | Bin 0 -> 9635 bytes pom.xml | 33 ++- .../cdap/plugin/cdc/common/ErrorHandling.java | 51 ++++ .../io/cdap/plugin/cdc/common/Schemas.java | 6 +- .../cdc/source/salesforce/CDCSalesforce.java | 69 ++++++ .../source/salesforce/SalesforceConfig.java | 159 +++++++++++++ .../SalesforceEventTopicListener.java | 161 +++++++++++++ .../source/salesforce/SalesforceReceiver.java | 217 +++++++++++++++++ .../authenticator/AuthResponse.java | 95 ++++++++ .../authenticator/Authenticator.java | 79 +++++++ .../AuthenticatorCredentials.java | 83 +++++++ .../salesforce/records/ChangeEventRecord.java | 148 ++++++++++++ .../salesforce/records/SalesforceRecord.java | 218 ++++++++++++++++++ .../salesforce/sobject/SObjectDescriptor.java | 157 +++++++++++++ .../sobject/SObjectsDescribeResult.java | 99 ++++++++ .../util/SalesforceConnectionUtil.java | 59 +++++ .../salesforce/util/SalesforceConstants.java | 33 +++ widgets/CDCSalesforce-streamingsource.json | 80 +++++++ 19 files changed, 1785 insertions(+), 5 deletions(-) create mode 100644 docs/CDCSalesforce-streamingsource.md create mode 100644 icons/CDCSalesforce-streamingsource.png create mode 100644 src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/CDCSalesforce.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceReceiver.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthResponse.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/Authenticator.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthenticatorCredentials.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/records/SalesforceRecord.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectDescriptor.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectsDescribeResult.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConnectionUtil.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConstants.java create mode 100644 widgets/CDCSalesforce-streamingsource.json diff --git a/docs/CDCSalesforce-streamingsource.md b/docs/CDCSalesforce-streamingsource.md new file mode 100644 index 0000000..6365921 --- /dev/null +++ b/docs/CDCSalesforce-streamingsource.md @@ -0,0 +1,43 @@ +# CDC Salesforce Streaming Source + +Description +----------- +This plugin reads Change Data Capture (CDC) events from Salesforce. + +All CDC source plugins are normally used in conjunction with CDC sink plugins. +CDC source produces messages in CDC format. + +Properties +---------- +**clientId**: Client ID from the connected app + +**clientSecret**: Client Secret from the connected app + +**username**: Username + +**password**: Password + +**loginUrl**: (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is +different. That's why user needs this option. + +**objects**: list of object's API names (For example: Task for base object and Employee__c for custom) separated by ",". +If list is empty then subscription for all events will be used. + +**Handle errors**: Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records +which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous +record is found. + +Note: CDC must be enabled on the database for the source to read the change data. + +Salesforce Change Data Capture +-------------------------- +When something changes in object for which is enable 'Change notifications'. A Change Data Capture event, or change +event, is a notification that Salesforce sends when a change to a Salesforce record occurs as part of a create, update, +delete, or undelete operation. The notification includes all new and changed fields, and header fields that contain +information about the change. For example, header fields indicate the type of +change that triggered the event and the origin of the change. Change events support all custom objects and a subset of +standard objects. More information can be found in [official documentation](https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_intro.htm). + +### Enable Change Data Capture for objects +To enable Change Data Capture for objects in Salesforce you have to +[select Objects for Change Notifications](https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_select_objects.htm) \ No newline at end of file diff --git a/icons/CDCSalesforce-streamingsource.png b/icons/CDCSalesforce-streamingsource.png new file mode 100644 index 0000000000000000000000000000000000000000..88020583ea7f062592e2c1a53250fe419c2c4c41 GIT binary patch literal 9635 zcmcgyRZtw!wjDHRkYGcC6C`AS!6mr6ySoJo?iSo7$l&e_?!hJK;4T4zLx2pfkNbH) z-ruWNUERC;oU^M>Rc~9XccikS6gCDa1^@uSmXQ`$eUyNJEx)tw?w?9+ z_6`{Zj7L9LQ#HOW4=vRrIglXooIi?6*Y3O-Na+lx@W*|_I?#waOCG(1K9F+$0TCSs zEqzI{S%kSlv&lSG(!R(BlEL4SQ-8w}u$H0H!4BcKWiN#5=*l(h<+Sauxd?hJZa&+a zS|TH7#k4LFvS$W(cRyh2R%2=(qp`cIxdt$%iA!<0xGh|ItQl(u+GJ$x)P< zf&=(U|DX}M>p~N+p0_VHng~b5n+jmn65VH6Ql|G+9+PO@CiP?(e|So>JU~%`-JSz? zkJoFH%s0}84S@vvzwy}eINzCl)7ICp@^f;&gp@QS^)Kq5CtkSfX*O9t73$0Y;Pe<&jf#^XL% zXBVYt{oGu(!up9f7TQJwy}7}|Vu=`?jI=LVxG8zYBiH;W?AB`W= zugV=&@jB}erxr3@53%jbMH@=J7%M$roVMBf3C1>ek?>*(n_2#NG>R?^;NaGOmumI+ zPU+(Q?ai|0O=$n%e#aN3kVDV0u+bdTb9zg}3{;2mmIe7b$E**!>Sl^XeLJ1j7c1bi zGs6jgXxs1Y&J{_VeqaA%?`i0W6?nli9s zu4T8CzpLiA;6~@kIwGvjsiVhvxx=)$JK5iTn>3cz6I$w-@rGpG4)uPrp?y91f-mn^ zU4%BYc(T+W(phdoEpwxFD4@9aXPFfKi=EP^XrmS2$+kIBn`arh3p17yoPEN`Xq~N3 z6Ak_UT$n1Kog4;IpNNxgZp%XkSsX^c9$g6t=gc0P1#*7wt5#S*8mrLcUfF5yb)IZ= z@_pX;`}ktGndaU^f;3+>lTMDG+r{R7(^vMfT%eSEGw11Foc_@8avE-b59uuRcCUqo z2Mk?ndB<)1bj^Cp?s>B2k>2p|_}UALVLE;x-8LFl6H%S>;r@il@5MbY{S9+ZnzoIi z^!<6?fmG_Cx&bU{cM=)4WC)TUmmiFp3qa=AI3e#ogS74Tb1vy>zplzmNor5I({ns zOi|?j!j%o6oP@Ei0I7QDeY=ZqcNH>n3c8kB^4TVBQ8|w<3iaw)$l;UD4gLkffk4l1AHv+s)QV>`sl#xVG?19}hP`>Caxa+jlU@dZ}f^6tOQG)bR0Tta&Gb=I{-R!7kw%5U^h)<*(1ah#gb+B$cO9w&{ zdD8GS3g=AKI(+t_ab#DX`tiF4#wlSurP=0|)`MUf%;X`mS+TQhiVRYy^|Pn6WML@u zPR!1SVl%UwN8vFr*(A*JJF=zZp9f-z4PN>g*M+C8Y3sVSb-%?~0qDMla!?E-NB^9! zTA|;|Ze3>S(t^&HoM{f?fECrXTs09H@B%LK$3rct?1!A=yP5m0xK7e1J&pS}Z)Ts{ zxrWW&R*_^%%Sc5%hMu;02+lS%jgc8hHJE}7kDRz&eW=TXhFC@O0G&>AKOYWrjLukE zD@-{*&Gub*sg*A7qe={FhXWXE3zgPiIxgc4$B3Us$IIlS2)2eEOA|4@12}+ zzq`j%Y-Bx6+wm4lnMXLTUBCOB5sQBOrbl+TpLy4*iGnx@qwk$`p@nN;DQ% zPZ$|<=rtqlJZuLl_;Mb4N zENLaqo?^P&ph{fLJy2|SuI+`S_05_2PZkxLLCy0>S1@07kctRFn}l{M$AVisvFD>6 zxJDLumj3Q-W&!9p;N_~XH@K_EL|7KySk_rMl0(Xk6_ho`(!y`Y|HUe+rH`|=X*J;I zu%5D6&g}z6BP;TR9ME;^3<;i8U8$_D;E?wQIn+7 zAe1|{*$vTIJS4oj&7r{$ZZ3MVT3wsAqe3Cl%p4deUc(YR4aY>aSz~8;Jgj`p%mC#} z9Xq06F4qT}++7|VL)sqr-IW9EYMGDfObbPqbw^s!Af<_lxROYVbS&xL#>Z)MY1W-( z&3$4J%BEF+-u~(udcC~l@A=n6shHi$Y+t0yFtq6i6TG+8jEAo@vTv$xS3GN=0S2$52c1m8lkP=#ZO+E*?aM7{BtkYDQ{f zZjY<<2ZszPZ7gP@!tXxe?^Upe2gRs%eis`R^+ylAhWM|Oe>70;m^ee;?DV6V9t`-5 zPmK>Bqmx>s%1;;zcD4K5a`WyQ^V#Pq+SP8~r>Ey~0L%@3HjJT&F_t9Ha^<)_iTTbv z&u-mDrjS1-t9kVEoQ9=rGVc`ef1YeUSlQfZi+V#`ko{V;+-}sY?Q>gU&^V~4u$fK9 zj__)*St_d(Hk7rLK2;5i=3IW9h3FJ65V@GLSwxqK)C^4ruOw-91(RIxjm}`Kmn5VU zb60@25TyRu%0Zzjbf#amE@h8|S_%OmtK}3~!(Nxe^9;Ra8rHtdImv>&Z{JHI-oc z<$^M=0L@b%n!_i*g!=?-5^meC2$Va{dMalp@OkuYwh&HO{0I=NW1+gR?+0w6uawLM zWz-f9Gu_iHY^Ob7#D5))>C7)D!L0c6hDcJ2TU)^dFZBa z^E;02VS-)M<=xO+L`()j(jRbpw6|97OD57Fy&gse-lZ6!nZNq;UGT38Z@X{eni)mu z@w}Z)s}K7EcShuHhu=@UA=B2=4@6RqIiwKMAL!8p32tt7hG$3D{_W7(=2;gYI1+~? za)Vc(2t6_Z8MkS`FKzf)UyMD#!^*)W9vavxa|4Qb?46q$PnTZ_1`5P&q-nrDeDZ&| z65Tj6nuKyfZP5H4qqNtEpp9+a_HGVY}s?l7Vz!^36E5(Ymox zd{pa40DmaE&~JWeAGyA5_BZ;NUj3L-oiltC%DHynIM;l68lcrBIa z@I9j0LS2PM@H}&?acSdFu)gk0SS7wz-D)VQ^>Grr+`_GdO4>$~R5Xu@j(9G;MIPHg z({AIz_iU>GwU*k(_X0BTXzOwwWMe5ahnpO2JE2z^MnoGE-eNw}j|#CH(nua3u2Eo? z2bR!UdPWrsixGlqz>D&ALJeRzP6r%=G7zYcdV6$qOV-dwRrq2|JtwhSMNng3@0dZ} z{-m|O>hm)LY$nr`!9$fDL8~7A5uoL)XZSDE5VNGU!k_O zLeHx0#T+L?WzB=Zr0kZFVdu!=O-& zt_gn5oLGJ>84^7v6#Dj4wW_OH0AG(>1J+f45%8MNG!}tbZ1fGy9FG6R@j}p$zaGOo zqWCh@R@q9u65AWxKQxJ@O(rzw$ zO?@;6zqY?toCa!Tg49ACY+caB7Z?{YazH}|r`tg5-~xT=;L>6zfJ5W>VyP|$=Ob53!vU^)^9|wIKYx-3 z?Uly7FAb{@+pgg?$-PK)J7m^EnpzXGq-a6@{DJA<%D}-RvAAdP`VZNl9cfkfPcB}1 z3jhy9gQd9*WV1Q{fxVo{;pF7P(A`13jJ*whAF()%1{^1BA($%+O zF5~KM=DV0IZ8sXzSYw|e^*WQ+wI1p|op?DBt}AFN;cV9C8$nb)zuVHf_jjKPI-qb# zC&F9gUdr@-?364|ap4qf=whKg^dNJH=TBgcs%w??w^FdxC?sncI8{5@Us>1gkAJMWX>*T9Hy&SByFF@(T(t_d6W#8YD`08tcxw!3nZ!!_r{L#iw^zC9 z;@@o)p?kSU$A5ajB_$^U-M>VFUKY~|IGcU*Zz|{TucyBmQCOb22tMzu_A*4aapJ3kZ zp{z`{XHUgjXp1>h?0{u#u!pa0k*~yD)^&HifYx-N5E3|YMmU3tmBm3FNQ)wZFTVJF`Cb0rJ(m%`IpbmBF<1Iaf}ePC}94$)ZN?@?BX|~O!2PP`F-ZG%kAL__$uR= zizk~=y0LY3GAY!|pR4uxqYQ$jVHT@HE)n8yhUe#+wRpWKiRn%3R8$!Rgxc*)NR?_! z856XtfyRfUQaTBDAjyCA(BDPtZ6MV_*kQg32TS_$MK}nQGUVZ5p2_YnF-?g4f$=CV zI7bsjZHiuN(D7^iOL1{+&FrzWYg(mD&Wa+DIJQY$)r$74wL&#pA1dQsaDVd7lZCpt zC>Cc=xtB~6??*a>OQI$@Z5>AmEW`wOYaWk7w4js}7BH2Ao6ZPkC$Y40$Xv-DdEWK< zI=6*nA2{rTGk;XYtchUpD*Aw!>9-%qzim#&8R)13gNi_@3mJefgDfq)z`+oU3QBX% zZl_jHg=7mf-38-eEIxNpYkdz9N81}kd2vTvkRlLW4`4lZe8x>IzHDzt5^XmwE)B4T z9k)kx)|3m4_$fM1S{17tnb3g}YX`tB_40XG1aq~zoeQT=){p-{(3Uk5i+ z-8Jc@?S4Ue(+=f)Mh#@wJ+f%g}CpB~xUAvn#Ae%gRo7y`yU+u1D=8N?aHS@;sDOUTvnSC05&kF0G*Pbnt&PekL!=w!DY4Qm9iH;N0Fq&wyV3#$ z+F)K#020H*v&|x}XJxiY>J}On(MAmn*z%~WDsu4YQ|UimTI%n+Q@SUo7e>}DUj!g+ zB8Xr5)TFXvhdUkZVFsyef3t|%3wmuMq}667W<@t^2Lb|0Njoh36ia2^xA2i-~Qzh4xDD9j~r;FHbG*rpmZT3-|6bYiZq|M_9K4=QbPp zGJT#@=sQ%RWRzQm4TjgShu2;UDo(14XO!s&mS_oiT@A-q>jvq()Th*W8Y|yP`TT=5 zwz1v|74~M4%lwqKhLyXj(2G)=L{|-GwtvB@`#5L3B37J%Y(b*u{Zwu9Li{EgFT?Ah zaZ0RmTLG*?k$9GYx8gqz2O!AD-gK`oH0BCX)f5~!`)Gcw2>oc|TrJa~WZQ~aGIo6L zXFG8_S*sR}OhCfo@lYIVXlJjhWB=IP;9H+>hGIZEQdT!uSMZ7{aek=zaM0)?;vL_1 zWM884YQ5UIK|so}JbG*X2E;a5^K%3x!x)e9l+jWaZ^DJ#n>h$_7-N}G0B!#wfYOH2 z>H*D;MdrApb1w#m(jl4~hE>{!I4&xTA&nsN)PVV|2i>%)ii=o7=9IlhoPo9dmEhUv zn|;8<3RJekL}x8rDN&XA{YfxkK7=!cvXTGjP_y(Bu7Q1BRrbMG!!C#L-D|y*Cb%~H z8r1_(2Q#L#u5n8I0GuXXu)YJ@WKIrRvWTS4byQQm!3nzTnX%3W$tq0{+-7QAoNc6gTkFho&X-6~V&^+cG}TpYPy=EN1agL_#>dno z%e`iQXwftO`AE5Q0lOw%b;0NdF`8btkB|1XSl*?k0(c(X}$CN^*rVfjM3J*Ij(<=-X7Vq ze~v^8>nTX@^l-oJ;@x7^_>j{#KK4%1ysTNmMe_8h(pVc_)Ux0^M&RQxs;z5fxM&{0 zTFQsne&Di10Do6c_ZjeLYh>cFLND?FKjIG0B250gnx|(KMzjD4XCKx0jI5!mcWGR<>s+i^9Jb?XRxCMh$`5 z&qNIadY0kDO7$z4kif8rbe)gbB0#MGdp!q(QhMae+FcX`ufffY3_V@DE-zgM%Sa&z zahplhy5?M5m_!*)MSxzo9|of+)po+?08N<7I0h2=;jabOXe&d)lOkcK8cJ8fUx_!EsHu`};R+{}IL#eYvxSSD}Sspd(fv2CWa0iUS3%tIz z-)--1uKD9@#OS-0`{|gRVBHip|LG&ZQo3fmJ#2%E>5GtweDwR-onaQ}0enI6EI-yS zbGD0?VW_bP$b}NPIP<*KCu25(+}e}YVgb+b-|Rc6`VJPd8CSaQKDzDfSbeyTJK2E} zKL-7+p|TdfWIIJ*EGy1C^Xs~SrfID6&DX?_+*`kl+rJ=u=a#R(K4m&NPz(6?6-NKc zoMzzLMnoMto86^l4`FYJ)Pikh>TRU9Ibj(x0wlkft9&%6brhB)`8dBK%r#h=v zT5Xd!IUQ)ls6QuY{A%rN7<;-hDV^BurJ_NVkL6Zm)8bw!k0`4Xm!>S_+4;6IE#+bx zklS6X<<_bT)?9>~8R3>U4{Qj&9P_(??%q*%1EZXERG&qX1&T)_^wcesHK|UIVhTOn zd|sUACa@Y)Q?|n*I-8~;X>5g5;1Sx~>D`0W#+T!eK>=Qa_Aev1y^uMBhJJwHD-=7# z_Op;vo@xC|wJ6{Gen&D^g7m=c&RX(mPubZgm-Nw_ae|v`>1Hx|0A%|Y-d)GEnFUG) zWFSO4`(=A+gc!Vg#~4EL>-HCco7_skby)*64+T+~9K zqJTC6A~@u;XwH`B_}b<=F%klDu^cx#Z|s*eMwl!Rj$Vr$h4zOtN-zW=NDUQRZi{jX z7vDL=S8~#9=weJQrVa1n=X*6R?&X~AHw(qx%{27k2RjVw-NzaOw3Y<2^gl{bC(%TI zP$KD1{rcy$>wFlNAUr@h;Y45;J9}oxC>w!Bl}!7awKE8&XY*dz#!$`5?-g-)FKbU` z!PY0OCF>MLFh2P?P@DtB6U4Zw-r!8Ke>G$zo6FMJv2XtuEiS&Mp$rZS_>HZT3AtO5-7GGNaRu-Y) z(f*JnM=7)8H*o<}foyUtcU4R?KO8xOOo^70>W@_AXIF*Nokx0N(nZenKJW_|77bi% zO45R&(8+aB?*cC{kp>=18~V7pbj8SAf{9S8%Q&SjhY)sv)pa~3Yb~^z7C5{Z0gotp zGr_jU1;t5dt(2@ZjE9ACrgGsR#5g;V?``vva|A&o??_DjZ>pv`_9ViDPKr#;HPsE( z><{w1z#=M9P@7tzE^2aKwZ~N%UHQoOOh=%sS@ecLaPW+;#mw!h#syHrN)gXFiAcP) zWWyFC+BhHrJ!~eOI5F1%Xf$se`{!0|wP2BBdCYcP&Z)`;v5?a;e2}JcyJZ|J{#;3M zo|+KfLlJY(qE=-cS;QviJ)V&^VZiFzuU}R_M=$kXoZLMf`p`zB(F13Ez1D6=j|TMi z! zJu#0Mz(xP~C$mC}bGa}goq-}ujST;WCDvYWkDK_Cfe4DCFP^futT8l9EVU^h#5R6@ zeLkgpk=eEP3RL`UUFe42(MDbjl%xcML_POord|Ot2-+}dDVrXKetZf#ddN9@x_It8 z_*aUmPr`u&sl?LFVH0ZPXt&g50I!Ie;*tuk_!zBp-z?`ek^LQrb0sTHnd#vAJ`kk~ zKV|QC%pk16&dvOkJK2i%`v=XWlz*;&+c2>RjoH_pr;KtNZ}~N@!RL^Xd~U@$wS1R5 z&&etj_671!{$CBe)x1-pmyt&M0J3&!twKrZVS-qJfJ|{ZDwCO5>;;8cQ8~K_6m%Nf z?1!`Cj3z)&3~@XUPFMQ#6WB5MYnoG?Nti^dNzzAbzAO!iT!D!=ORd7jD7iq|UVL-( zr6l=eONrekx4nPfcJ8C{IvlMyGk3<~x+zN=ThF<#M2Ob_734S7^!|I)ygM>AF{3tS zFt4Tgm-XyLKZ$vrIh!=U!6*|9?IhJ^=}-v;?Gn+xq~gYE;%yA3Da4-qs{5I}aiO!3 z8wO;BSd~IfBRxC+huHE#PBRYKdNvTiT>lBa*SFtnZUoC}%oOR7HV))ufpSs!nAz&A zlei%kP9X-GemH=4-`QSo;t=8NrI4Nuv8It?CG1t&O*k6nwwr#HQj3?2#_!roTV130 zJ1OKdu$-)tATk2}fTd09m}x82_^HdfYJn^y#xNS+E560O-7cO-h*x>dcN{4Xfts{r z*bTie2KPKInubD~G*h4RZE8qvHPJ`kiE7@a(qW^;qsQ&qS>35PSlnKLWoYtEC9O1d zNFMVBS1K`nu?c>SGzo{)1LCUVZ0#PfG)6?tg>s!5<=dP?FVFq5j z2y0vQ0dyh3gGQn&EH zn`U>Nyp3Jgva{Fm+|+tQ|7)t=aY)}6opUMOOfHjnu~$4;rcn$ZHq*rl18~Ga6B*!v zZ#-(uQ^o1FR%8}3OTugH8=+MKjLW+`+oCB%7L514FDp3#w#>#=aV2)K8uLRPR$&Wn zp0D_(V7SIIetDK5v|1*3T1ZgAB$H_H&xFEtl%$WPzGnD9t0IldbR*#0w$x`)ZG9rt zCE@BKt6x}l2FF3V%SLi(HVb~MZP;X?A!X=B=+FbsG1F0+^SPPAo|tEaH7g ztcF>9F;Sfn-?hd~L~qF%VC;O!RQYaUz;lxD8vo}y57Nz+mw@70}%939L>&DOFP>}2z|n3*y&mT9!4 zRk-N+ck7mX+7QrkF*dk}*wdtT0y82sBKbkQh=p2!D)3F?Q3;mvY|_V0*GTz$?{JF+ z8pl$dbVO~jI0flX>T3PuzZrZ`yi&k3Id~G8g~*;Gp#E5y_F#C}HI*Nvd(oKg>uK@$ zmp9}+lhI5J6-c&oWlOM85FXtxflvI2L-_(bx$1R?>t8^wbwW?tCgU~;znwJw%=E5( zpu4LAg3$b+@d#<}v#j&*H1U0n_$!hK%n|K5MB$B{%~!{_m9vy;F`C0IT(bAECYd57 zx~(s|sAH=5n+_x&F^ozhFjL0_j32oF=WpfH&T*)KjS#f;G3S!7fFK-YB67q7hSBZe z+z2K`{*Y>NVhjEi)c9iVvX!B^AL`!nyMaU_?0TmC%WB?y(;kHGp=No@YyJnFU+Z_0 zH@Yd-6Sv)tH(ZUpFW$AD43!Ahthv^|(0#~qZuw1dO*G{c;GKP zul^^Ii?ciTY#1OH-qAI?4$;&}BkZ?Yz@ozyQ+{zTm(kPDU`t)pdFAOd&uU$+ZT_v<#%}rb__{_dpyb1stE0`C=$_Zj+R|m85aj^2S pz20;1v9gxt9nt)kg1w`;jiuNBuCVZ=n)|8%kdaUnuMss4{vT`)ua5u# literal 0 HcmV?d00001 diff --git a/pom.xml b/pom.xml index 8cb6886..976de3b 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,10 @@ 6.1.0.jre7 2.8.0 13.0.1 + 45.0.0 + 4.0.0 + 3.1.6 + 20180813 4.1.16.Final 4.11 @@ -514,6 +518,27 @@ + + + com.force.api + force-wsc + ${salesforce.api.version} + + + com.force.api + force-partner-api + ${salesforce.api.version} + + + org.cometd.java + cometd-java-client + ${cometd.java.client.version} + + + org.awaitility + awaitility + ${awaitility.version} + io.cdap.plugin @@ -591,10 +616,10 @@ test - org.awaitility - awaitility - 3.1.6 - test + org.json + json + ${json.version} + compile diff --git a/src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java b/src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java new file mode 100644 index 0000000..d5828e5 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.common; + +import java.util.Optional; +import java.util.stream.Stream; + +/** + * Indicates error handling strategy which will be used during reading Salesforce records. + */ +public enum ErrorHandling { + + SKIP("Skip on error"), + STOP("Stop on error"); + + private final String value; + + ErrorHandling(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + /** + * Converts error handling string value into {@link ErrorHandling} enum. + * + * @param stringValue error handling string value + * @return error handling type in optional container + */ + public static Optional fromValue(String stringValue) { + return Stream.of(values()) + .filter(keyType -> keyType.value.equalsIgnoreCase(stringValue)) + .findAny(); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/common/Schemas.java b/src/main/java/io/cdap/plugin/cdc/common/Schemas.java index f8fcb41..169a5c8 100644 --- a/src/main/java/io/cdap/plugin/cdc/common/Schemas.java +++ b/src/main/java/io/cdap/plugin/cdc/common/Schemas.java @@ -81,7 +81,11 @@ public static StructuredRecord toCDCRecord(StructuredRecord changeRecord) { } public static String getTableName(String namespacedTableName) { - return namespacedTableName.split("\\.")[1]; + String[] parts = namespacedTableName.split("\\."); + if (parts.length == 1) { + return namespacedTableName; + } + return parts[1]; } private static Schema enumWith(Class> enumClass) { diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/CDCSalesforce.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/CDCSalesforce.java new file mode 100644 index 0000000..3bb956c --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/CDCSalesforce.java @@ -0,0 +1,69 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.dataset.DatasetProperties; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.streaming.StreamingContext; +import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.plugin.cdc.common.Schemas; +import io.cdap.plugin.common.Constants; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming source for reading from Salesforce CDC plugin. + */ +@Plugin(type = StreamingSource.PLUGIN_TYPE) +@Name("CDCSalesforce") +@Description("CDC Salesforce Streaming Source") +public class CDCSalesforce extends StreamingSource { + + private static final Logger LOG = LoggerFactory.getLogger(CDCSalesforce.class); + private final SalesforceConfig config; + + public CDCSalesforce(SalesforceConfig config) { + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + LOG.info("Creating connection with url '{}', username '{}', clientId '{}'", + config.getLoginUrl(), config.getUsername(), config.getClientId()); + config.validate(); + + pipelineConfigurer.createDataset(config.referenceName, Constants.EXTERNAL_DATASET_TYPE, + DatasetProperties.EMPTY); + pipelineConfigurer.getStageConfigurer().setOutputSchema(Schemas.CHANGE_SCHEMA); + } + + @Override + public JavaDStream getStream(StreamingContext context) { + config.validate(); + + SalesforceReceiver salesforceReceiver + = new SalesforceReceiver(config.getAuthenticatorCredentials(), config.getObjects(), config.getErrorHandling()); + return context.getSparkStreamingContext() + .receiverStream(salesforceReceiver) + .map(Schemas::toCDCRecord); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java new file mode 100644 index 0000000..802c23e --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java @@ -0,0 +1,159 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce; + +import com.sforce.ws.ConnectionException; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.plugin.PluginConfig; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; +import io.cdap.cdap.etl.api.validation.InvalidStageException; +import io.cdap.plugin.cdc.common.CDCReferencePluginConfig; +import io.cdap.plugin.cdc.common.ErrorHandling; +import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConnectionUtil; +import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConstants; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Defines the {@link PluginConfig} for the {@link CDCSalesforce}. + */ +public class SalesforceConfig extends CDCReferencePluginConfig { + private static final String OBJECTS_SEPARATOR = ","; + + @Name(SalesforceConstants.PROPERTY_CLIENT_ID) + @Description("Salesforce connected app's client ID") + @Macro + private String clientId; + + @Name(SalesforceConstants.PROPERTY_CLIENT_SECRET) + @Description("Salesforce connected app's client secret key") + @Macro + private String clientSecret; + + @Name(SalesforceConstants.PROPERTY_USERNAME) + @Description("Salesforce username") + @Macro + private String username; + + @Name(SalesforceConstants.PROPERTY_PASSWORD) + @Description("Salesforce password") + @Macro + private String password; + + @Name(SalesforceConstants.PROPERTY_LOGIN_URL) + @Description("Endpoint to authenticate to") + @Macro + private String loginUrl; + + @Name(SalesforceConstants.PROPERTY_OBJECTS) + @Description("Objects for tracking") + @Macro + @Nullable + private String objects; + + @Name(SalesforceConstants.PROPERTY_ERROR_HANDLING) + @Description("Strategy used to handle erroneous records. Acceptable values are Skip on error, Stop on error.\n" + + "Skip on error - ignores erroneous record.\n" + + "Stop on error - fails pipeline due to erroneous record.") + @Macro + private String errorHandling; + + public SalesforceConfig() { + super(""); + } + + public SalesforceConfig(String referenceName, String clientId, String clientSecret, + String username, String password, String loginUrl, String objects, String errorHandling) { + super(referenceName); + this.clientId = clientId; + this.clientSecret = clientSecret; + this.username = username; + this.password = password; + this.loginUrl = loginUrl; + this.objects = objects; + this.errorHandling = errorHandling; + } + + public String getClientId() { + return clientId; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getLoginUrl() { + return loginUrl; + } + + public List getObjects() { + if (objects == null || objects.isEmpty()) { + return Collections.emptyList(); + } + return Arrays.asList(objects.split(OBJECTS_SEPARATOR)); + } + + public ErrorHandling getErrorHandling() { + return ErrorHandling.fromValue(errorHandling) + .orElseThrow(() -> new InvalidConfigPropertyException("Unsupported error handling value: " + errorHandling, + SalesforceConstants.PROPERTY_ERROR_HANDLING)); + } + + @Override + public void validate() { + validateConnection(); + validateErrorHandling(); + } + + public AuthenticatorCredentials getAuthenticatorCredentials() { + return SalesforceConnectionUtil.getAuthenticatorCredentials(username, password, clientId, clientSecret, loginUrl); + } + + private void validateConnection() { + if (containsMacro(SalesforceConstants.PROPERTY_CLIENT_ID) + || containsMacro(SalesforceConstants.PROPERTY_CLIENT_SECRET) + || containsMacro(SalesforceConstants.PROPERTY_USERNAME) + || containsMacro(SalesforceConstants.PROPERTY_PASSWORD) + || containsMacro(SalesforceConstants.PROPERTY_LOGIN_URL)) { + return; + } + + try { + SalesforceConnectionUtil.getPartnerConnection(getAuthenticatorCredentials()); + } catch (ConnectionException e) { + throw new InvalidStageException("Cannot connect to Salesforce API with credentials specified", e); + } + } + + private void validateErrorHandling() { + if (containsMacro(SalesforceConstants.PROPERTY_ERROR_HANDLING)) { + return; + } + + getErrorHandling(); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java new file mode 100644 index 0000000..d41f73a --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java @@ -0,0 +1,161 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce; + +import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthResponse; +import io.cdap.plugin.cdc.source.salesforce.authenticator.Authenticator; +import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConstants; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.cometd.bayeux.client.ClientSessionChannel; +import org.cometd.client.BayeuxClient; +import org.cometd.client.transport.LongPollingTransport; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; + +/** + * Listens to a specific Salesforce eventTopics and adds messages to the blocking queue, + * which can be read by a user of the class. + */ +public class SalesforceEventTopicListener { + private static final Logger LOG = LoggerFactory.getLogger(SalesforceEventTopicListener.class); + + private static final String DEFAULT_EVENT_ENDPOINT = "/cometd/" + SalesforceConstants.API_VERSION; + /** + * Timeout of 110 seconds is enforced by Salesforce Streaming API and is not configurable. + * So we enforce the same on client. + */ + private static final int CONNECTION_TIMEOUT = 110; + private static final long HANDSHAKE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(110); + + private static final int HANDSHAKE_CHECK_INTERVAL_MS = 1000; + + private static final String BASE_EVENT_TOPIC = "/data/ChangeEvents"; + private static final String EVENT_TOPIC_PATTERN = "/data/%sChangeEvent"; + + // store message string not JSONObject, since it's not serializable for later Spark usage + private final BlockingQueue messagesQueue = new LinkedBlockingQueue<>(); + + private final AuthenticatorCredentials credentials; + private final List objectsForTracking; + + public SalesforceEventTopicListener(AuthenticatorCredentials credentials, List objectsForTracking) { + this.credentials = credentials; + this.objectsForTracking = objectsForTracking; + } + + /** + * Start the Bayeux Client which listens to the Salesforce EventTopic and saves received messages + * to the queue. + */ + public void start() { + try { + BayeuxClient bayeuxClient = getClient(credentials); + waitForHandshake(bayeuxClient); + LOG.debug("Client handshake done"); + + ClientSessionChannel.MessageListener messageListener = (channel, message) -> messagesQueue.add(message.getJSON()); + if (objectsForTracking.isEmpty()) { + LOG.debug("Subscribe on '{}'", BASE_EVENT_TOPIC); + bayeuxClient.getChannel(BASE_EVENT_TOPIC) + .subscribe(messageListener); + } else { + for (String objectName : objectsForTracking) { + String topic = getObjectTopic(objectName); + LOG.debug("Subscribe on '{}'", topic); + bayeuxClient.getChannel(topic) + .subscribe(messageListener); + } + } + } catch (Exception e) { + throw new RuntimeException("Could not start client", e); + } + } + + /** + * Retrieves message from the messages queue, waiting up to the + * specified wait time if necessary for an element to become available. + * + * @param timeout how long to wait before giving up + * @param unit timeunit of timeout + * @return the message, or {@code null} if the specified + * waiting time elapses before an element is available + * @throws InterruptedException blocking call is interrupted + */ + public String getMessage(long timeout, TimeUnit unit) throws InterruptedException { + return messagesQueue.poll(timeout, unit); + } + + private String getObjectTopic(String objectName) { + String name = objectName.endsWith("__c") ? objectName.substring(0, objectName.length() - 1) : objectName; + return format(EVENT_TOPIC_PATTERN, name); + } + + private BayeuxClient getClient(AuthenticatorCredentials credentials) throws Exception { + AuthResponse authResponse = Authenticator.oauthLogin(credentials); + String acessToken = authResponse.getAccessToken(); + String instanceUrl = authResponse.getInstanceUrl(); + + SslContextFactory sslContextFactory = new SslContextFactory(); + + // Set up a Jetty HTTP client to use with CometD + HttpClient httpClient = new HttpClient(sslContextFactory); + httpClient.setConnectTimeout(CONNECTION_TIMEOUT); + httpClient.start(); + + Map options = new HashMap<>(); + // Adds the OAuth header in LongPollingTransport + LongPollingTransport transport = new LongPollingTransport(options, httpClient) { + @Override + protected void customize(Request exchange) { + super.customize(exchange); + exchange.header("Authorization", "OAuth " + acessToken); + } + }; + + // Now set up the Bayeux client itself + BayeuxClient client = new BayeuxClient(instanceUrl + DEFAULT_EVENT_ENDPOINT, transport); + client.handshake(); + + return client; + } + + + private void waitForHandshake(BayeuxClient client) { + try { + Awaitility.await() + .atMost(HANDSHAKE_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .pollInterval(HANDSHAKE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS) + .until(client::isHandshook); + } catch (ConditionTimeoutException e) { + throw new IllegalStateException("Client could not handshake with Salesforce server", e); + } + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceReceiver.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceReceiver.java new file mode 100644 index 0000000..963f00c --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceReceiver.java @@ -0,0 +1,217 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.sforce.soap.partner.PartnerConnection; +import com.sforce.soap.partner.QueryResult; +import com.sforce.soap.partner.sobject.SObject; +import com.sforce.ws.ConnectionException; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.format.UnexpectedFormatException; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.cdc.common.ErrorHandling; +import io.cdap.plugin.cdc.common.OperationType; +import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.cdc.source.salesforce.records.ChangeEventRecord; +import io.cdap.plugin.cdc.source.salesforce.records.SalesforceRecord; +import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectDescriptor; +import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectsDescribeResult; +import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConnectionUtil; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Implementation of Spark receiver to receive Salesforce change events + */ +public class SalesforceReceiver extends Receiver { + private static final Logger LOG = LoggerFactory.getLogger(SalesforceReceiver.class); + private static final String RECEIVER_THREAD_NAME = "salesforce_streaming_api_listener"; + // every x seconds thread wakes up and checks if stream is not yet stopped + private static final long GET_MESSAGE_TIMEOUT_SECONDS = 2; + + private final AuthenticatorCredentials credentials; + private final List objectsForTracking; + private final ErrorHandling errorHandling; + + private SalesforceEventTopicListener eventTopicListener; + + + private Map schemas = new HashMap<>(); + private Map> events = new HashMap<>(); + + SalesforceReceiver(AuthenticatorCredentials credentials, List objectsForTracking, + ErrorHandling errorHandling) { + super(StorageLevel.MEMORY_AND_DISK_2()); + this.credentials = credentials; + this.objectsForTracking = objectsForTracking; + this.errorHandling = errorHandling; + } + + @Override + public void onStart() { + eventTopicListener = new SalesforceEventTopicListener(credentials, objectsForTracking); + eventTopicListener.start(); + + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(RECEIVER_THREAD_NAME + "-%d") + .build(); + + Executors.newSingleThreadExecutor(namedThreadFactory).submit(this::receive); + } + + @Override + public void onStop() { + // There is nothing we can do here as the thread calling receive() + // is designed to stop by itself if isStopped() returns false + } + + private void receive() { + PartnerConnection connection; + try { + connection = SalesforceConnectionUtil.getPartnerConnection(credentials); + } catch (ConnectionException e) { + throw new RuntimeException("Failed to connect to Salesforce", e); + } + + while (!isStopped()) { + try { + String message = eventTopicListener.getMessage(GET_MESSAGE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + if (message != null) { + ChangeEventRecord record = ChangeEventRecord.fromJSON(message); + + List eventsList = events.getOrDefault(record.getTransactionKey(), Collections.emptyList()); + eventsList.add(record); + + if (record.isTransactionEnd()) { + processEvents(eventsList, connection); + events.remove(record.getTransactionKey()); + } else { + events.put(record.getTransactionKey(), eventsList); + } + } + } catch (Exception e) { + switch (errorHandling) { + case SKIP: + LOG.warn("Failed to process message, skipping it.", e); + break; + case STOP: + throw new RuntimeException("Failed to process message", e); + default: + throw new UnexpectedFormatException(String.format("Unknown error handling strategy '%s'", errorHandling)); + } + } + } + } + + private void processEvents(List events, PartnerConnection connection) throws ConnectionException { + for (ChangeEventRecord event : events) { + SObjectDescriptor descriptor = SObjectDescriptor.fromName(event.getEntityName(), connection); + SObjectsDescribeResult describeResult = new SObjectsDescribeResult(connection, descriptor.getAllParentObjects()); + + Schema schema = SalesforceRecord.getSchema(descriptor, describeResult); + updateSchemaIfNecessary(event.getEntityName(), schema); + + if (event.getOperationType() != OperationType.DELETE) { + sendUpdateRecords(event, descriptor, schema, connection); + } else { + sendDeleteRecords(event.getIds(), event.getEntityName(), schema); + } + } + } + + private void updateSchemaIfNecessary(String entityName, Schema schema) { + Schema previousSchema = schemas.get(entityName); + + if (!schema.equals(previousSchema)) { + StructuredRecord ddlRecord = SalesforceRecord.buildDDLStructuredRecord(entityName, schema); + schemas.put(entityName, schema); + + LOG.debug("Sending ddl message for '{}'", entityName); + store(ddlRecord); + } + } + + private void sendUpdateRecords(ChangeEventRecord event, SObjectDescriptor descriptor, Schema schema, + PartnerConnection connection) throws ConnectionException { + String query = getQuery(event, descriptor.getFieldsNames()); + QueryResult queryResult = connection.query(query); + + if (queryResult != null) { + if (queryResult.getRecords().length < event.getIds().size() && !event.isWildcard()) { + List idsForDelete = findIdsMismatch(queryResult.getRecords(), event.getIds()); + sendDeleteRecords(idsForDelete, event.getEntityName(), schema); + } + + for (SObject sObject : queryResult.getRecords()) { + StructuredRecord dmlRecord = SalesforceRecord + .buildDMLStructuredRecord(sObject.getId(), event.getEntityName(), schema, event.getOperationType(), sObject); + + LOG.debug("Sending dml message for '{}:{}'", event.getEntityName(), sObject.getId()); + store(dmlRecord); + } + } + } + + private List findIdsMismatch(SObject[] sObjectArray, List ids) { + Set idsFromQuery = Arrays.stream(sObjectArray) + .map(SObject::getId) + .collect(Collectors.toSet()); + + return ids.stream() + .filter(id -> !idsFromQuery.contains(id)) + .collect(Collectors.toList()); + } + + private void sendDeleteRecords(List ids, String entityName, Schema schema) { + for (String id : ids) { + StructuredRecord dmlRecord = SalesforceRecord + .buildDMLStructuredRecord(id, entityName, schema, OperationType.DELETE, null); + + LOG.debug("Sending dml message for {}:{}", entityName, id); + store(dmlRecord); + } + } + + private String getQuery(ChangeEventRecord event, List fields) { + String query = String.format("select %s from %s", String.join(",", fields), event.getEntityName()); + if (event.isWildcard()) { + return query; + } else { + String ids = event + .getIds() + .stream() + .map(id -> String.format("'%s'", id)) + .collect(Collectors.joining(",")); + return String.format("%s where id in (%s)", query, ids); + } + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthResponse.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthResponse.java new file mode 100644 index 0000000..bf918d3 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthResponse.java @@ -0,0 +1,95 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.authenticator; + +import com.google.gson.annotations.SerializedName; + +import java.util.Objects; + +/** + * Oauth2 response from salesforce server + */ +public class AuthResponse { + @SerializedName("access_token") + private final String accessToken; + @SerializedName("instance_url") + private final String instanceUrl; + private final String id; + @SerializedName("token_type") + private final String tokenType; + @SerializedName("issued_at") + private final String issuedAt; + private final String signature; + + public AuthResponse(String accessToken, String instanceUrl, String id, String tokenType, + String issuedAt, String signature) { + this.accessToken = accessToken; + this.instanceUrl = instanceUrl; + this.id = id; + this.tokenType = tokenType; + this.issuedAt = issuedAt; + this.signature = signature; + } + + public String getAccessToken() { + return accessToken; + } + + public String getInstanceUrl() { + return instanceUrl; + } + + public String getId() { + return id; + } + + public String getTokenType() { + return tokenType; + } + + public String getIssuedAt() { + return issuedAt; + } + + public String getSignature() { + return signature; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AuthResponse that = (AuthResponse) o; + + return (Objects.equals(accessToken, that.accessToken) && + Objects.equals(instanceUrl, that.instanceUrl) && + Objects.equals(id, that.id) && + Objects.equals(tokenType, that.tokenType) && + Objects.equals(issuedAt, that.issuedAt) && + Objects.equals(signature, that.signature)); + } + + @Override + public int hashCode() { + return Objects.hash(accessToken, instanceUrl, id, tokenType, issuedAt, signature); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/Authenticator.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/Authenticator.java new file mode 100644 index 0000000..1c66d78 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/Authenticator.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.authenticator; + +import com.google.gson.Gson; +import com.sforce.ws.ConnectorConfig; +import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConstants; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +/** + * Authentication to Salesforce via oauth2 + */ +public class Authenticator { + private static final Gson GSON = new Gson(); + + /** + * Authenticates via oauth2 to salesforce and returns a connectorConfig + * which can be used by salesforce libraries to make a connection. + * + * @param credentials information to log in + * @return ConnectorConfig which can be used to create BulkConnection and PartnerConnection + */ + public static ConnectorConfig createConnectorConfig(AuthenticatorCredentials credentials) { + try { + AuthResponse authResponse = oauthLogin(credentials); + ConnectorConfig connectorConfig = new ConnectorConfig(); + connectorConfig.setSessionId(authResponse.getAccessToken()); + String apiVersion = SalesforceConstants.API_VERSION; + String restEndpoint = String.format("%s/services/async/%s", authResponse.getInstanceUrl(), apiVersion); + String serviceEndPoint = String.format("%s/services/Soap/u/%s", authResponse.getInstanceUrl(), apiVersion); + connectorConfig.setRestEndpoint(restEndpoint); + connectorConfig.setServiceEndpoint(serviceEndPoint); + // This should only be false when doing debugging. + connectorConfig.setCompression(true); + // Set this to true to see HTTP requests and responses on stdout + connectorConfig.setTraceMessage(false); + return connectorConfig; + } catch (Exception e) { + throw new RuntimeException("Connection to Salesforce with plugin configurations failed", e); + } + } + + /** + * Authenticate via oauth2 to salesforce and return response to auth request. + * + * @param credentials information to log in + * @return AuthResponse response to http request + */ + public static AuthResponse oauthLogin(AuthenticatorCredentials credentials) throws Exception { + SslContextFactory sslContextFactory = new SslContextFactory(); + HttpClient httpClient = new HttpClient(sslContextFactory); + try { + httpClient.start(); + String response = httpClient.POST(credentials.getLoginUrl()).param("grant_type", "password") + .param("client_id", credentials.getClientId()) + .param("client_secret", credentials.getClientSecret()) + .param("username", credentials.getUsername()) + .param("password", credentials.getPassword()).send().getContentAsString(); + return GSON.fromJson(response, AuthResponse.class); + } finally { + httpClient.stop(); + } + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthenticatorCredentials.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthenticatorCredentials.java new file mode 100644 index 0000000..c6294c2 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthenticatorCredentials.java @@ -0,0 +1,83 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.authenticator; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Stores information to connect to salesforce via oauth2 + */ +public class AuthenticatorCredentials implements Serializable { + private final String username; + private final String password; + private final String clientId; + private final String clientSecret; + private final String loginUrl; + + public AuthenticatorCredentials(String username, String password, + String clientId, String clientSecret, String loginUrl) { + this.username = username; + this.password = password; + this.clientId = clientId; + this.clientSecret = clientSecret; + this.loginUrl = loginUrl; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getClientId() { + return clientId; + } + + public String getClientSecret() { + return clientSecret; + } + + public String getLoginUrl() { + return loginUrl; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AuthenticatorCredentials that = (AuthenticatorCredentials) o; + + return Objects.equals(username, that.username) && + Objects.equals(password, that.password) && + Objects.equals(clientId, that.clientId) && + Objects.equals(clientSecret, that.clientSecret) && + Objects.equals(loginUrl, that.loginUrl); + } + + @Override + public int hashCode() { + return Objects.hash(username, password, clientId, clientSecret, loginUrl); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java new file mode 100644 index 0000000..9736fbf --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java @@ -0,0 +1,148 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.records; + +import io.cdap.plugin.cdc.common.OperationType; +import org.json.JSONObject; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Contains information about change event + */ +public class ChangeEventRecord { + private final List ids; + private final String entityName; + private final OperationType operationType; + private final String transactionKey; + private final boolean transactionEnd; + private final boolean wildcard; + + public ChangeEventRecord(List ids, String entityName, OperationType operationType, String transactionKey, + boolean transactionEnd) { + this.ids = ids; + this.entityName = entityName; + this.operationType = operationType; + this.transactionKey = transactionKey; + this.transactionEnd = transactionEnd; + wildcard = parseWildcard(ids); + } + + /** + * Parses json to change event record + * + * @param json JSON with change event + * @return parsed change event record + */ + public static ChangeEventRecord fromJSON(String json) { + JSONObject message = new JSONObject(json); + return new ChangeEventRecord( + parseIds(message), + parseEntityName(message), + parseChangeOperation(message), + parseTransactionKey(message), + parseTransactionEnd(message) + ); + } + + public List getIds() { + return ids; + } + + public String getEntityName() { + return entityName; + } + + public OperationType getOperationType() { + return operationType; + } + + public String getTransactionKey() { + return transactionKey; + } + + public boolean isTransactionEnd() { + return transactionEnd; + } + + public boolean isWildcard() { + return wildcard; + } + + private static String parseEntityName(JSONObject message) { + return message + .getJSONObject("data") + .getJSONObject("payload") + .getJSONObject("ChangeEventHeader") + .getString("entityName"); + } + + private static List parseIds(JSONObject message) { + return message + .getJSONObject("data") + .getJSONObject("payload") + .getJSONObject("ChangeEventHeader") + .getJSONArray("recordIds").toList() + .stream() + .map(Object::toString) + .collect(Collectors.toList()); + } + + private static boolean parseWildcard(List ids) { + return ids.isEmpty() || ids.size() == 1 && ids.get(0).charAt(3) == '*'; + } + + private static OperationType parseChangeOperation(JSONObject message) { + String operation = message + .getJSONObject("data") + .getJSONObject("payload") + .getJSONObject("ChangeEventHeader") + .getString("changeType"); + switch (operation) { + case "CREATE": + case "GAP_CREATE": + case "UNDELETE": + case "GAP_UNDELETE": + return OperationType.INSERT; + case "UPDATE": + case "GAP_UPDATE": + case "GAP_OVERFLOW": + return OperationType.UPDATE; + case "DELETE": + case "GAP_DELETE": + return OperationType.DELETE; + } + throw new IllegalArgumentException(String.format("Unknown change operation '%s'", operation)); + } + + private static String parseTransactionKey(JSONObject message) { + return message + .getJSONObject("data") + .getJSONObject("payload") + .getJSONObject("ChangeEventHeader") + .getString("transactionKey"); + } + + private static boolean parseTransactionEnd(JSONObject message) { + return message + .getJSONObject("data") + .getJSONObject("payload") + .getJSONObject("ChangeEventHeader") + .getBoolean("isTransactionEnd"); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/SalesforceRecord.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/SalesforceRecord.java new file mode 100644 index 0000000..5b856b7 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/SalesforceRecord.java @@ -0,0 +1,218 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.records; + +import com.sforce.soap.partner.Field; +import com.sforce.soap.partner.sobject.SObject; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.format.UnexpectedFormatException; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.cdc.common.OperationType; +import io.cdap.plugin.cdc.common.Schemas; +import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectDescriptor; +import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectsDescribeResult; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** + * Converts salseforce data to cdap format + */ +public class SalesforceRecord { + private static final String PRIMARY_FIELD_KEY = "Id"; + + /** + * Builds structured record for DDL message + * + * @param entityName name of entity + * @param schema schema for entity + * @return structured record + */ + public static StructuredRecord buildDDLStructuredRecord(String entityName, Schema schema) { + return StructuredRecord.builder(Schemas.DDL_SCHEMA) + .set(Schemas.TABLE_FIELD, entityName) + .set(Schemas.SCHEMA_FIELD, schema.toString()) + .build(); + } + + /** + * Builds structured record for DML message + * + * @param id id of record + * @param entityName entity name of record + * @param schema schema for record + * @param operationType type of operation + * @param sObject Salesforce object + * @return structured record + */ + public static StructuredRecord buildDMLStructuredRecord(String id, String entityName, Schema schema, + OperationType operationType, SObject sObject) { + return StructuredRecord.builder(Schemas.DML_SCHEMA) + .set(Schemas.TABLE_FIELD, entityName) + .set(Schemas.PRIMARY_KEYS_FIELD, Collections.singletonList(PRIMARY_FIELD_KEY)) + .set(Schemas.OP_TYPE_FIELD, operationType.name()) + .set(Schemas.UPDATE_SCHEMA_FIELD, schema.toString()) + .set(Schemas.UPDATE_VALUES_FIELD, getChangeData(id, sObject, schema)) + .build(); + } + + /** + * Builds schema from Salesforce object description + * + * @param sObjectDescriptor descriptor for Salesforce object + * @param describeResult JSON with change event + * @return structured record + */ + public static Schema getSchema(SObjectDescriptor sObjectDescriptor, SObjectsDescribeResult describeResult) { + return Schema.recordOf(Schemas.SCHEMA_RECORD, getList(sObjectDescriptor, describeResult)); + } + + private static Map getChangeData(String id, SObject sObject, Schema changeSchema) { + Optional opSObject = Optional.ofNullable(sObject); + + if (opSObject.isPresent()) { + Map changes = new HashMap<>(); + for (Schema.Field field : Objects.requireNonNull(changeSchema.getFields())) { + changes.put(field.getName(), convertValue((String) sObject.getField(field.getName()), field)); + } + return changes; + } else { + return Collections.singletonMap(PRIMARY_FIELD_KEY, id); + } + } + + private static Object convertValue(String value, Schema.Field field) { + Schema fieldSchema = field.getSchema(); + + if (fieldSchema.getType() == Schema.Type.NULL) { + return null; + } + + if (fieldSchema.isNullable()) { + if (value == null) { + return null; + } + fieldSchema = fieldSchema.getNonNullable(); + } + + Schema.Type fieldSchemaType = fieldSchema.getType(); + + if (value.isEmpty() && fieldSchemaType != Schema.Type.STRING) { + return null; + } + + Schema.LogicalType logicalType = fieldSchema.getLogicalType(); + if (fieldSchema.getLogicalType() != null) { + switch (logicalType) { + case DATE: + // date will be in yyyy-mm-dd format + return Math.toIntExact(LocalDate.parse(value).toEpochDay()); + case TIMESTAMP_MILLIS: + return Instant.parse(value).toEpochMilli(); + case TIMESTAMP_MICROS: + return TimeUnit.MILLISECONDS.toMicros(Instant.parse(value).toEpochMilli()); + case TIME_MILLIS: + return Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(LocalTime.parse(value).toNanoOfDay())); + case TIME_MICROS: + return TimeUnit.NANOSECONDS.toMicros(LocalTime.parse(value).toNanoOfDay()); + default: + throw new UnexpectedFormatException(String.format("Field '%s' is of unsupported type '%s'", + field.getName(), logicalType.getToken())); + } + } + + switch (fieldSchemaType) { + case BOOLEAN: + return Boolean.valueOf(value); + case INT: + return Integer.valueOf(value); + case LONG: + return Long.valueOf(value); + case FLOAT: + return Float.valueOf(value); + case DOUBLE: + return Double.valueOf(value); + case BYTES: + return Byte.valueOf(value); + case STRING: + return value; + } + + throw new UnexpectedFormatException( + String.format("Unsupported schema type: '%s' for field: '%s'. Supported types are 'boolean, int, long, float, " + + "double, binary and string'.", field.getSchema(), field.getName())); + } + + private static List getList(SObjectDescriptor sObjectDescriptor, + SObjectsDescribeResult describeResult) { + List schemaFields = new ArrayList<>(); + + for (SObjectDescriptor.FieldDescriptor fieldDescriptor : sObjectDescriptor.getFields()) { + String parent = fieldDescriptor.hasParents() ? fieldDescriptor.getLastParent() : sObjectDescriptor.getName(); + Field field = describeResult.getField(parent, fieldDescriptor.getName()); + if (field == null) { + throw new IllegalArgumentException( + String.format("Field '%s' is absent in Salesforce describe result", fieldDescriptor.getFullName())); + } + Schema.Field schemaField = Schema.Field.of(fieldDescriptor.getFullName(), getCdapSchemaField(field)); + schemaFields.add(schemaField); + } + + return schemaFields; + } + + private static Schema getCdapSchemaField(Field field) { + Schema fieldSchema; + switch (field.getType()) { + case _boolean: + fieldSchema = Schema.of(Schema.Type.BOOLEAN); + break; + case _int: + fieldSchema = Schema.of(Schema.Type.INT); + break; + case _long: + fieldSchema = Schema.of(Schema.Type.LONG); + break; + case _double: + case currency: + case percent: + fieldSchema = Schema.of(Schema.Type.DOUBLE); + break; + case date: + fieldSchema = Schema.of(Schema.LogicalType.DATE); + break; + case datetime: + fieldSchema = Schema.of(Schema.LogicalType.TIMESTAMP_MILLIS); + break; + case time: + fieldSchema = Schema.of(Schema.LogicalType.TIME_MILLIS); + break; + default: + fieldSchema = Schema.of(Schema.Type.STRING); + } + return field.isNillable() ? Schema.nullableOf(fieldSchema) : fieldSchema; + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectDescriptor.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectDescriptor.java new file mode 100644 index 0000000..2fb1844 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectDescriptor.java @@ -0,0 +1,157 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.sobject; + +import com.sforce.soap.partner.Field; +import com.sforce.soap.partner.PartnerConnection; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Contains information about SObject, including its name and list of fields. + * Can be obtained from SObject name. + */ +public class SObjectDescriptor { + + private final String name; + private final List fields; + + /** + * Connects to Salesforce, gets describe result for the given sObject name and stores + * information about its fields into {@link SObjectDescriptor} class. + * + * @param name sObject name + * @param partnerConnection Salesforce connection + * @return sObject descriptor + */ + public static SObjectDescriptor fromName(String name, PartnerConnection partnerConnection) { + SObjectsDescribeResult describeResult = new SObjectsDescribeResult( + partnerConnection, Collections.singletonList(name)); + List fields = describeResult.getFields().stream() + .map(Field::getName) + .map(FieldDescriptor::new) + .collect(Collectors.toList()); + + return new SObjectDescriptor(name, fields); + } + + public SObjectDescriptor(String name, List fields) { + this.name = name; + this.fields = new ArrayList<>(fields); + } + + public String getName() { + return name; + } + + /** + * Collects sObject names needed to be described in order to obtains field type information. + * + * @return list of sObject names + */ + public Set getAllParentObjects() { + Set parents = fields.stream() + .filter(FieldDescriptor::hasParents) + .map(FieldDescriptor::getLastParent) + .collect(Collectors.toSet()); + + // add top level sObject for fields that don't have parents + parents.add(name); + + return parents; + } + + /** + * Collects all field names, for fields with parents includes parents separated by dot. + * + * @return list of field names + */ + public List getFieldsNames() { + return fields.stream() + .map(FieldDescriptor::getFullName) + .collect(Collectors.toList()); + } + + public List getFields() { + return fields; + } + + @Override + public String toString() { + return "SObjectDescriptor{" + "name='" + name + '\'' + ", fields=" + fields + '}'; + } + + /** + * Contains information about field, including list of parents if present. + */ + public static class FieldDescriptor { + + private final String name; + private final List parents; + + public FieldDescriptor(String name) { + this.name = name; + this.parents = new ArrayList<>(); + } + + public String getName() { + return name; + } + + /** + * Returns field name with parents connected by dots. + * + * @return full field name + */ + public String getFullName() { + if (hasParents()) { + List nameParts = new ArrayList<>(parents); + nameParts.add(name); + return String.join(".", nameParts); + } + return name; + } + + /** + * Checks if field has parents. + * + * @return true if field has at least one parent, false otherwise + */ + public boolean hasParents() { + return !parents.isEmpty(); + } + + /** + * Return last parent of the field. + * Primary used to obtain describe result from Salesforce. + * + * @return last parent if field has parents, null otherwise + */ + public String getLastParent() { + return hasParents() ? parents.get(parents.size() - 1) : null; + } + + @Override + public String toString() { + return "FieldDescriptor{" + "name='" + name + '\'' + ", parents=" + parents + '}'; + } + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectsDescribeResult.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectsDescribeResult.java new file mode 100644 index 0000000..03298bb --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectsDescribeResult.java @@ -0,0 +1,99 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.cdc.source.salesforce.sobject; + +import com.google.common.collect.Lists; +import com.sforce.soap.partner.DescribeSObjectResult; +import com.sforce.soap.partner.Field; +import com.sforce.soap.partner.PartnerConnection; +import com.sforce.ws.ConnectionException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Retrieves {@link DescribeSObjectResult}s for the given sObjects + * and adds field information to the internal holder. + * This class will be used to populate {@link SObjectDescriptor} for queries by sObject + * or to generate CDAP schema based on Salesforce fields information. + */ +public class SObjectsDescribeResult { + + // Salesforce limitation that we can describe only 100 sObjects at a time + private static final int DESCRIBE_SOBJECTS_LIMIT = 100; + + // key -> [sObject name], value -> [key -> field name, value -> field] + private final Map> objectToFieldMap = new HashMap<>(); + + public SObjectsDescribeResult(PartnerConnection connection, Collection sObjects) { + + // split the given sObjects into smaller partitions to ensure we don't exceed the limitation + Lists.partition(new ArrayList<>(sObjects), DESCRIBE_SOBJECTS_LIMIT).stream() + .map(partition -> { + try { + return connection.describeSObjects(partition.toArray(new String[0])); + } catch (ConnectionException e) { + throw new RuntimeException(e); + } + }) + .flatMap(Arrays::stream) + .forEach(this::addSObjectDescribe); + } + + /** + * Retrieves all stored fields. + * + * @return list of {@link Field}s + */ + public List getFields() { + return objectToFieldMap.values().stream() + .map(Map::values) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + /** + * Attempts to find {@link Field} by sObject name and field name. + * + * @param sObjectName sObject name + * @param fieldName field name + * @return field instance if found, null otherwise + */ + public Field getField(String sObjectName, String fieldName) { + Map fields = objectToFieldMap.get(sObjectName.toLowerCase()); + return fields == null ? null : fields.get(fieldName.toLowerCase()); + } + + private void addSObjectDescribe(DescribeSObjectResult sObjectDescribe) { + Map fields = Arrays.stream(sObjectDescribe.getFields()) + .collect(Collectors.toMap( + field -> field.getName().toLowerCase(), + Function.identity(), + (o, n) -> n, + LinkedHashMap::new)); // preserve field order for queries by sObject + + // sObjects names are case-insensitive + // store them in lower case to ensure we obtain them case-insensitively + objectToFieldMap.put(sObjectDescribe.getName().toLowerCase(), fields); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConnectionUtil.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConnectionUtil.java new file mode 100644 index 0000000..63ec836 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConnectionUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.util; + +import com.sforce.soap.partner.Connector; +import com.sforce.soap.partner.PartnerConnection; +import com.sforce.ws.ConnectionException; +import com.sforce.ws.ConnectorConfig; +import io.cdap.plugin.cdc.source.salesforce.authenticator.Authenticator; +import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials; + +/** + * Utility class which provides methods to establish connection with Salesforce. + */ +public class SalesforceConnectionUtil { + + /** + * Based on given Salesforce credentials, attempt to establish {@link PartnerConnection}. + * This is mainly used to obtain sObject describe results. + * + * @param credentials Salesforce credentials + * @return partner connection instance + * @throws ConnectionException in case error when establishing connection + */ + public static PartnerConnection getPartnerConnection(AuthenticatorCredentials credentials) + throws ConnectionException { + ConnectorConfig connectorConfig = Authenticator.createConnectorConfig(credentials); + return Connector.newConnection(connectorConfig); + } + + /** + * Creates {@link AuthenticatorCredentials} instance based on given parameters. + * + * @param username Salesforce username + * @param password Salesforce password + * @param clientId Salesforce client id + * @param clientSecret Salesforce client secret + * @param loginUrl Salesforce authentication url + * @return authenticator credentials + */ + public static AuthenticatorCredentials getAuthenticatorCredentials(String username, String password, String clientId, + String clientSecret, String loginUrl) { + return new AuthenticatorCredentials(username, password, clientId, clientSecret, loginUrl); + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConstants.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConstants.java new file mode 100644 index 0000000..0d86c60 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConstants.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.util; + +/** + * Constants related to Salesforce and configuration + */ +public class SalesforceConstants { + + public static final String API_VERSION = "45.0"; + + public static final String PROPERTY_CLIENT_ID = "clientId"; + public static final String PROPERTY_CLIENT_SECRET = "clientSecret"; + public static final String PROPERTY_USERNAME = "username"; + public static final String PROPERTY_PASSWORD = "password"; + public static final String PROPERTY_LOGIN_URL = "loginUrl"; + public static final String PROPERTY_OBJECTS = "objects"; + public static final String PROPERTY_ERROR_HANDLING = "errorHandling"; +} diff --git a/widgets/CDCSalesforce-streamingsource.json b/widgets/CDCSalesforce-streamingsource.json new file mode 100644 index 0000000..3fe19f4 --- /dev/null +++ b/widgets/CDCSalesforce-streamingsource.json @@ -0,0 +1,80 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "configuration-groups": [ + { + "label": "Authentication", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "username" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + }, + { + "widget-type": "textbox", + "label": "Client Id", + "name": "clientId" + }, + { + "widget-type": "password", + "label": "Client Secret", + "name": "clientSecret" + }, + { + "widget-type": "textbox", + "label": "Login Url", + "name": "loginUrl", + "widget-attributes" : { + "default": "https://login.salesforce.com/services/oauth2/token" + } + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "select", + "label": "Error Handling", + "name": "errorHandling", + "widget-attributes": { + "values": [ + "Skip on error", + "Stop on error" + ], + "default": "Skip on error" + } + }, + { + "widget-type": "dsv", + "label": "Objects for tracking", + "name": "objects", + "widget-attributes": { + "delimiter": "," + } + } + ] + } + ], + "outputs": [ + { + "widget-type": "non-editable-schema-editor", + "schema": { + "name": "CDCRecord", + "type": "record", + "fields": [ + { + "name": "cdcMessage", + "type": "bytes" + } + ] + } + } + ] +} From a0ce65cfb0cc08c88d402684e872eab6546e8e17 Mon Sep 17 00:00:00 2001 From: Maksym Lozbin Date: Fri, 10 May 2019 02:06:46 +0300 Subject: [PATCH 2/2] CDAP-15237 Enhance the streaming plugin to support CDC option --- docs/CDCSalesforce-streamingsource.md | 17 +- pom.xml | 2 +- .../source/salesforce/SalesforceConfig.java | 2 +- .../SalesforceEventTopicListener.java | 15 +- .../source/salesforce/SalesforceReceiver.java | 101 ++++++++---- .../salesforce/records/ChangeEventHeader.java | 82 ++++++++++ .../salesforce/records/ChangeEventRecord.java | 148 ------------------ .../salesforce/records/ChangeEventType.java | 25 +++ widgets/CDCDatabase-streamingsource.json | 2 +- widgets/CDCSalesforce-streamingsource.json | 34 ++-- widgets/CTSQLServer-streamingsource.json | 2 +- 11 files changed, 217 insertions(+), 213 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventHeader.java delete mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java create mode 100644 src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventType.java diff --git a/docs/CDCSalesforce-streamingsource.md b/docs/CDCSalesforce-streamingsource.md index 6365921..eafcfa3 100644 --- a/docs/CDCSalesforce-streamingsource.md +++ b/docs/CDCSalesforce-streamingsource.md @@ -9,21 +9,22 @@ CDC source produces messages in CDC format. Properties ---------- -**clientId**: Client ID from the connected app +**Client Id**: Client ID from the connected app. -**clientSecret**: Client Secret from the connected app +**Client Secret**: Client Secret from the connected app. -**username**: Username +**Username**: Username to use when connecting to Salesforce. -**password**: Password +**Password**: Password to use when connecting to Salesforce. -**loginUrl**: (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is -different. That's why user needs this option. +**Login Url**: Salesforce login URL to authenticate against. +The default value is https://login.salesforce.com/services/oauth2/token. +This should be changed when running against the Salesforce sandbox. -**objects**: list of object's API names (For example: Task for base object and Employee__c for custom) separated by ",". +**Tracking Objects**: Objects to read change events from (For example: Task for base object and Employee__c for custom) separated by ",". If list is empty then subscription for all events will be used. -**Handle errors**: Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records +**Error Handling**: Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous record is found. diff --git a/pom.xml b/pom.xml index 976de3b..19fccc4 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ 6.1.0.jre7 2.8.0 13.0.1 - 45.0.0 + 46.0.0 4.0.0 3.1.6 20180813 diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java index 802c23e..0b1f99b 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java @@ -66,7 +66,7 @@ public class SalesforceConfig extends CDCReferencePluginConfig { private String loginUrl; @Name(SalesforceConstants.PROPERTY_OBJECTS) - @Description("Objects for tracking") + @Description("Tracking Objects") @Macro @Nullable private String objects; diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java index d41f73a..59072d1 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,10 +66,11 @@ public class SalesforceEventTopicListener { private final AuthenticatorCredentials credentials; private final List objectsForTracking; + private BayeuxClient bayeuxClient; public SalesforceEventTopicListener(AuthenticatorCredentials credentials, List objectsForTracking) { this.credentials = credentials; - this.objectsForTracking = objectsForTracking; + this.objectsForTracking = new ArrayList<>(objectsForTracking); } /** @@ -77,7 +79,7 @@ public SalesforceEventTopicListener(AuthenticatorCredentials credentials, List { private static final Logger LOG = LoggerFactory.getLogger(SalesforceReceiver.class); private static final String RECEIVER_THREAD_NAME = "salesforce_streaming_api_listener"; // every x seconds thread wakes up and checks if stream is not yet stopped private static final long GET_MESSAGE_TIMEOUT_SECONDS = 2; + private static final Gson GSON = new Gson(); private final AuthenticatorCredentials credentials; private final List objectsForTracking; private final ErrorHandling errorHandling; - + private final Map schemas = new HashMap<>(); + private final Map> events = new HashMap<>(); private SalesforceEventTopicListener eventTopicListener; - - - private Map schemas = new HashMap<>(); - private Map> events = new HashMap<>(); + private static final JsonParser JSON_PARSER = new JsonParser(); SalesforceReceiver(AuthenticatorCredentials credentials, List objectsForTracking, ErrorHandling errorHandling) { super(StorageLevel.MEMORY_AND_DISK_2()); this.credentials = credentials; - this.objectsForTracking = objectsForTracking; + this.objectsForTracking = new ArrayList<>(objectsForTracking); this.errorHandling = errorHandling; } @@ -106,16 +111,22 @@ private void receive() { String message = eventTopicListener.getMessage(GET_MESSAGE_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (message != null) { - ChangeEventRecord record = ChangeEventRecord.fromJSON(message); - - List eventsList = events.getOrDefault(record.getTransactionKey(), Collections.emptyList()); - eventsList.add(record); - - if (record.isTransactionEnd()) { + // whole message class is not needed because we are interested only in change event payload + JsonObject headerElement = JSON_PARSER.parse(message) + .getAsJsonObject() + .getAsJsonObject("data") + .getAsJsonObject("payload") + .getAsJsonObject("ChangeEventHeader"); + ChangeEventHeader event = GSON.fromJson(headerElement, ChangeEventHeader.class); + + List eventsList = events.getOrDefault(event.getTransactionKey(), new ArrayList<>()); + eventsList.add(event); + + if (event.isTransactionEnd()) { processEvents(eventsList, connection); - events.remove(record.getTransactionKey()); + events.remove(event.getTransactionKey()); } else { - events.put(record.getTransactionKey(), eventsList); + events.put(event.getTransactionKey(), eventsList); } } } catch (Exception e) { @@ -126,24 +137,25 @@ private void receive() { case STOP: throw new RuntimeException("Failed to process message", e); default: - throw new UnexpectedFormatException(String.format("Unknown error handling strategy '%s'", errorHandling)); + throw new IllegalStateException(String.format("Unknown error handling strategy '%s'", errorHandling)); } } } + eventTopicListener.stop(); } - private void processEvents(List events, PartnerConnection connection) throws ConnectionException { - for (ChangeEventRecord event : events) { + private void processEvents(List events, PartnerConnection connection) throws ConnectionException { + for (ChangeEventHeader event : events) { SObjectDescriptor descriptor = SObjectDescriptor.fromName(event.getEntityName(), connection); SObjectsDescribeResult describeResult = new SObjectsDescribeResult(connection, descriptor.getAllParentObjects()); Schema schema = SalesforceRecord.getSchema(descriptor, describeResult); updateSchemaIfNecessary(event.getEntityName(), schema); - if (event.getOperationType() != OperationType.DELETE) { + if (getOperationType(event) != OperationType.DELETE) { sendUpdateRecords(event, descriptor, schema, connection); } else { - sendDeleteRecords(event.getIds(), event.getEntityName(), schema); + sendDeleteRecords(Arrays.asList(event.getRecordIds()), event.getEntityName(), schema); } } } @@ -160,20 +172,20 @@ private void updateSchemaIfNecessary(String entityName, Schema schema) { } } - private void sendUpdateRecords(ChangeEventRecord event, SObjectDescriptor descriptor, Schema schema, + private void sendUpdateRecords(ChangeEventHeader event, SObjectDescriptor descriptor, Schema schema, PartnerConnection connection) throws ConnectionException { String query = getQuery(event, descriptor.getFieldsNames()); QueryResult queryResult = connection.query(query); if (queryResult != null) { - if (queryResult.getRecords().length < event.getIds().size() && !event.isWildcard()) { - List idsForDelete = findIdsMismatch(queryResult.getRecords(), event.getIds()); + if (queryResult.getRecords().length < event.getRecordIds().length && !isWildcardEvent(event)) { + List idsForDelete = findIdsMismatch(queryResult.getRecords(), event.getRecordIds()); sendDeleteRecords(idsForDelete, event.getEntityName(), schema); } for (SObject sObject : queryResult.getRecords()) { StructuredRecord dmlRecord = SalesforceRecord - .buildDMLStructuredRecord(sObject.getId(), event.getEntityName(), schema, event.getOperationType(), sObject); + .buildDMLStructuredRecord(sObject.getId(), event.getEntityName(), schema, getOperationType(event), sObject); LOG.debug("Sending dml message for '{}:{}'", event.getEntityName(), sObject.getId()); store(dmlRecord); @@ -181,12 +193,12 @@ private void sendUpdateRecords(ChangeEventRecord event, SObjectDescriptor descri } } - private List findIdsMismatch(SObject[] sObjectArray, List ids) { + private List findIdsMismatch(SObject[] sObjectArray, String[] ids) { Set idsFromQuery = Arrays.stream(sObjectArray) .map(SObject::getId) .collect(Collectors.toSet()); - return ids.stream() + return Stream.of(ids) .filter(id -> !idsFromQuery.contains(id)) .collect(Collectors.toList()); } @@ -201,17 +213,38 @@ private void sendDeleteRecords(List ids, String entityName, Schema schem } } - private String getQuery(ChangeEventRecord event, List fields) { + private String getQuery(ChangeEventHeader event, List fields) { String query = String.format("select %s from %s", String.join(",", fields), event.getEntityName()); - if (event.isWildcard()) { + if (isWildcardEvent(event)) { return query; } else { - String ids = event - .getIds() - .stream() + String ids = Stream.of(event.getRecordIds()) .map(id -> String.format("'%s'", id)) .collect(Collectors.joining(",")); return String.format("%s where id in (%s)", query, ids); } } + + private static boolean isWildcardEvent(ChangeEventHeader event) { + String[] ids = event.getRecordIds(); + return ids.length == 0 || ids.length == 1 && ids[0].charAt(3) == '*'; + } + + private static OperationType getOperationType(ChangeEventHeader event) { + switch (event.getChangeType()) { + case CREATE: + case GAP_CREATE: + case UNDELETE: + case GAP_UNDELETE: + return OperationType.INSERT; + case UPDATE: + case GAP_UPDATE: + case GAP_OVERFLOW: + return OperationType.UPDATE; + case DELETE: + case GAP_DELETE: + return OperationType.DELETE; + } + throw new IllegalArgumentException(String.format("Unknown change operation '%s'", event.getChangeType())); + } } diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventHeader.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventHeader.java new file mode 100644 index 0000000..8e49bec --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventHeader.java @@ -0,0 +1,82 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.records; + +import java.util.Arrays; + +/** + * Contains information about change event. Should be used instead of {@link com.sforce.soap.partner.ChangeEventHeader} + * because GSON does not support setters. + */ +public class ChangeEventHeader { + private String[] recordIds; + private String entityName; + private ChangeEventType changeType; + private String transactionKey; + private boolean isTransactionEnd; + + public String[] getRecordIds() { + return recordIds; + } + + public void setRecordIds(String[] recordIds) { + this.recordIds = recordIds.clone(); + } + + public String getEntityName() { + return entityName; + } + + public void setEntityName(String entityName) { + this.entityName = entityName; + } + + public ChangeEventType getChangeType() { + return changeType; + } + + public void setChangeType(ChangeEventType changeType) { + this.changeType = changeType; + } + + public String getTransactionKey() { + return transactionKey; + } + + public void setTransactionKey(String transactionKey) { + this.transactionKey = transactionKey; + } + + public boolean isTransactionEnd() { + return isTransactionEnd; + } + + public void setTransactionEnd(boolean transactionEnd) { + isTransactionEnd = transactionEnd; + } + + @Override + public String toString() { + return "ChangeEventHeader{" + + "recordIds=" + Arrays.toString(recordIds) + + ", entityName='" + entityName + '\'' + + ", changeType=" + changeType + + ", transactionKey='" + transactionKey + '\'' + + ", isTransactionEnd=" + isTransactionEnd + + '}'; + } +} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java deleted file mode 100644 index 9736fbf..0000000 --- a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventRecord.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright © 2019 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.plugin.cdc.source.salesforce.records; - -import io.cdap.plugin.cdc.common.OperationType; -import org.json.JSONObject; - -import java.util.List; -import java.util.stream.Collectors; - -/** - * Contains information about change event - */ -public class ChangeEventRecord { - private final List ids; - private final String entityName; - private final OperationType operationType; - private final String transactionKey; - private final boolean transactionEnd; - private final boolean wildcard; - - public ChangeEventRecord(List ids, String entityName, OperationType operationType, String transactionKey, - boolean transactionEnd) { - this.ids = ids; - this.entityName = entityName; - this.operationType = operationType; - this.transactionKey = transactionKey; - this.transactionEnd = transactionEnd; - wildcard = parseWildcard(ids); - } - - /** - * Parses json to change event record - * - * @param json JSON with change event - * @return parsed change event record - */ - public static ChangeEventRecord fromJSON(String json) { - JSONObject message = new JSONObject(json); - return new ChangeEventRecord( - parseIds(message), - parseEntityName(message), - parseChangeOperation(message), - parseTransactionKey(message), - parseTransactionEnd(message) - ); - } - - public List getIds() { - return ids; - } - - public String getEntityName() { - return entityName; - } - - public OperationType getOperationType() { - return operationType; - } - - public String getTransactionKey() { - return transactionKey; - } - - public boolean isTransactionEnd() { - return transactionEnd; - } - - public boolean isWildcard() { - return wildcard; - } - - private static String parseEntityName(JSONObject message) { - return message - .getJSONObject("data") - .getJSONObject("payload") - .getJSONObject("ChangeEventHeader") - .getString("entityName"); - } - - private static List parseIds(JSONObject message) { - return message - .getJSONObject("data") - .getJSONObject("payload") - .getJSONObject("ChangeEventHeader") - .getJSONArray("recordIds").toList() - .stream() - .map(Object::toString) - .collect(Collectors.toList()); - } - - private static boolean parseWildcard(List ids) { - return ids.isEmpty() || ids.size() == 1 && ids.get(0).charAt(3) == '*'; - } - - private static OperationType parseChangeOperation(JSONObject message) { - String operation = message - .getJSONObject("data") - .getJSONObject("payload") - .getJSONObject("ChangeEventHeader") - .getString("changeType"); - switch (operation) { - case "CREATE": - case "GAP_CREATE": - case "UNDELETE": - case "GAP_UNDELETE": - return OperationType.INSERT; - case "UPDATE": - case "GAP_UPDATE": - case "GAP_OVERFLOW": - return OperationType.UPDATE; - case "DELETE": - case "GAP_DELETE": - return OperationType.DELETE; - } - throw new IllegalArgumentException(String.format("Unknown change operation '%s'", operation)); - } - - private static String parseTransactionKey(JSONObject message) { - return message - .getJSONObject("data") - .getJSONObject("payload") - .getJSONObject("ChangeEventHeader") - .getString("transactionKey"); - } - - private static boolean parseTransactionEnd(JSONObject message) { - return message - .getJSONObject("data") - .getJSONObject("payload") - .getJSONObject("ChangeEventHeader") - .getBoolean("isTransactionEnd"); - } -} diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventType.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventType.java new file mode 100644 index 0000000..d6421d1 --- /dev/null +++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventType.java @@ -0,0 +1,25 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cdc.source.salesforce.records; + +/** + * Contains Salesforce change event data types. Should be used instead of + * {@link com.sforce.soap.partner.ChangeEventType} because of GAP_OVERFLOW type. + */ +public enum ChangeEventType { + CREATE, DELETE, UNDELETE, UPDATE, GAP_CREATE, GAP_DELETE, GAP_UNDELETE, GAP_UPDATE, GAP_OVERFLOW +} diff --git a/widgets/CDCDatabase-streamingsource.json b/widgets/CDCDatabase-streamingsource.json index 4f0e3bf..8050875 100644 --- a/widgets/CDCDatabase-streamingsource.json +++ b/widgets/CDCDatabase-streamingsource.json @@ -46,7 +46,7 @@ "type": "record", "fields": [ { - "name": "cdcMessage", + "name": "cdc_msg", "type": "bytes" } ] diff --git a/widgets/CDCSalesforce-streamingsource.json b/widgets/CDCSalesforce-streamingsource.json index 3fe19f4..7076ea0 100644 --- a/widgets/CDCSalesforce-streamingsource.json +++ b/widgets/CDCSalesforce-streamingsource.json @@ -8,23 +8,23 @@ "properties": [ { "widget-type": "textbox", - "label": "Username", - "name": "username" + "label": "Client Id", + "name": "clientId" }, { "widget-type": "password", - "label": "Password", - "name": "password" + "label": "Client Secret", + "name": "clientSecret" }, { "widget-type": "textbox", - "label": "Client Id", - "name": "clientId" + "label": "Username", + "name": "username" }, { "widget-type": "password", - "label": "Client Secret", - "name": "clientSecret" + "label": "Password", + "name": "password" }, { "widget-type": "textbox", @@ -39,6 +39,14 @@ { "label": "Advanced", "properties": [ + { + "widget-type": "dsv", + "label": "Tracking Objects", + "name": "objects", + "widget-attributes": { + "delimiter": "," + } + }, { "widget-type": "select", "label": "Error Handling", @@ -50,14 +58,6 @@ ], "default": "Skip on error" } - }, - { - "widget-type": "dsv", - "label": "Objects for tracking", - "name": "objects", - "widget-attributes": { - "delimiter": "," - } } ] } @@ -70,7 +70,7 @@ "type": "record", "fields": [ { - "name": "cdcMessage", + "name": "cdc_msg", "type": "bytes" } ] diff --git a/widgets/CTSQLServer-streamingsource.json b/widgets/CTSQLServer-streamingsource.json index 17f708e..01e4f3e 100644 --- a/widgets/CTSQLServer-streamingsource.json +++ b/widgets/CTSQLServer-streamingsource.json @@ -56,7 +56,7 @@ "type": "record", "fields": [ { - "name": "cdcMessage", + "name": "cdc_msg", "type": "bytes" } ]