From 10f8e8251bc42442c2c86e259b154b1349191211 Mon Sep 17 00:00:00 2001 From: xuelijun <977662702@qq.com> Date: Tue, 15 Jul 2025 10:04:16 +0800 Subject: [PATCH] canal --- logs/infra-server.log.2025-06-12.0.gz | Bin 0 -> 3259 bytes logs/infra-server.log.2025-06-13.0.gz | Bin 0 -> 6993 bytes tashow-framework/pom.xml | 1 + tashow-framework/tashow-data-canal/pom.xml | 49 +++++ .../canal/config/CanalAutoConfiguration.java | 22 ++ .../cloud/canal/service/CanalSyncService.java | 188 ++++++++++++++++ .../canal/service/CanalSyncServiceTest.java | 156 ++++++++++++++ .../tashow/cloud/canal/service/Canaldb.java | 27 +++ .../canal/service/SqlExecutorService.java | 56 +++++ .../tashow/cloud/canal/service/SqlTask.java | 19 ++ .../cloud/canal/service/SqlTaskQueue.java | 32 +++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + tashow-module/tashow-module-infra/pom.xml | 15 ++ .../cloud/infra/framework/CanalClient.java | 204 ++++++++++++++++++ .../src/main/resources/application-local.yaml | 5 +- 15 files changed, 773 insertions(+), 2 deletions(-) create mode 100644 logs/infra-server.log.2025-06-12.0.gz create mode 100644 logs/infra-server.log.2025-06-13.0.gz create mode 100644 tashow-framework/tashow-data-canal/pom.xml create mode 100644 tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/config/CanalAutoConfiguration.java create mode 100644 tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncService.java create mode 100644 tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncServiceTest.java create mode 100644 tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/Canaldb.java create mode 100644 tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlExecutorService.java create mode 100644 tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTask.java create mode 100644 tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTaskQueue.java create mode 100644 tashow-framework/tashow-data-canal/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 tashow-module/tashow-module-infra/src/main/java/com/tashow/cloud/infra/framework/CanalClient.java diff --git a/logs/infra-server.log.2025-06-12.0.gz b/logs/infra-server.log.2025-06-12.0.gz new file mode 100644 index 0000000000000000000000000000000000000000..8d54b58c9f1ee704489cbc33bf7f630043d31d65 GIT binary patch literal 3259 zcmV;s3`FxEiwFP!00000|Lt5)ZyUK4KPNyx!Cr`k0cQaZJ%1!wl3^5%WoNyx6GL{I zgJUeH8A{@MW~d#G>?qqk^wh<+X!o?2q9_)NqCK_e0{uEp(Nn)dAITYOB#mO(8C!NB z1(7vF4(Gkc-+O#~kECtdWy7o(mW`}x-LmSoi5;tievLjkc1i)-eZKPo*|uq$pB$U( zPAyO+*1F>a0bwCJKH6>6(eu^KleAow+JTE>?E1X#VHUO{f_LTjA&t&xwAt(V%q217 zAwt7u4`UL=EbO2uRr^8~enHL%vevMPfy#3hchSLaLsN*JH0rOZf*C#?yfrO-!dZ;x(n_j>| zCgO<1JVGFj8xZi>FCN>uolu@fCmeMtYIEP`=Tg`f>P5WGe2SPr#Esb*t)owU?h^mg ze6mYby9lxgc5w**pA(mhEgrU6r$J(}$B9QHlzxWI9;=E-*E~V89&**iu?NrXdK*dr zhETj4iUT|#fxm(}5%2dl8rwUYFZYhVeAayN^56uCzUxvU9vF`?4M`Y(e0-51R&U6h zg`G+P%yBGm3%B9BLEEJ7$9tgsq~+76l!QB=AUqn3GWP=D7BNpMQw!=KkG8>LD6IqK z*Y}piK*Tgiz=?M#>x5fAqhWm5qb_SFtS@VGN07(bZ5lywrnNeXC|DF%zUcTF4Qa$& z)p{K%yS2fi4i>!#%0UkTDy>pt8hDR}ZTZfrZ(c{jg<2cyh_s}!bK{LR@daH+K6yLX z1qwF8zVAbEKL*1Q(S7dG4RamngSI^yx`VAQbx-B3w8|CcFS)x<<1Y6y_oYyVH$`&Sqd+_#j2E0}xh|x3J*iDmvuXju z)p}ZKi_}QU=c^g%in&tMIX|$_V$8aj9We}&- zHdMx*qjWtRt- z_^d@*B;h}liHdAqV-t+hNPjOQ-bV2N1af?n$4_LQE#xtxKIiR66ZrsrLjo`b^~t8B6*ZupHv?$!Gpy2mZaoN8W*y`c;7W(B_tw zv{~7DwzvCqY012EKeBa6(xH(r?bTP*1%R!pgr39;2dK=1KR_0s;DB@g zh=|keH`Ij}M-vrKeu-3P0TBRf4h@SQ50&pZ*{DXG9`V4d0Njb8=K%xBD)G*k6aui- z&amZis)gDmXH@dgq6lnqY+TB7B~=NDW#wBnU+isv2`0F?d3e-(xpnlixjhBawz)Px zNKg0n0G$D-Y63^Rr7y2Gg9Wv?0P@2FvSr;%Agg%_SXtt=h)+TnY^+>!@HDwdrnURp z*A(!gGdVup5aR{CYytlV2!CgO@b@zq(-U)%KuwHe?M`vLw6sTpmPeLqrnPFmG3Cdu zn5F7k%d$(=1&hfBLMKU^l$I2@^| znsM!!U5`dFNbW->he;C)gF4nt(s<0$>wuR zgBJBX>fx9NE{XAx`typm^j?@Bdapx)zAj7n9rYNIFJIPT@{x&f-K zLJ}sVN8B#OKnc(g5kfK~FB?mqQ zJgYi&biAorN`${^nb^WkX&Uj>LVzK6nYefhr;@_QYEw@aQ~-Xjo{aA@*-a6VNg$@M zLV%>qeyA;-=9)&n_2J~7%aj^U#CQqgNfL(5HaXajTr%rJuYtFksX4o6mw-5!f2>mz5egw0wJ8`CoRSTE``5BFdQ z#SQsWCgBfxKN_{l-*}g|4QnMJz-#oa(l+W%BealBn=S0Yzs4XW0dt$YFO6=N#Btl^ z>6(V5SLiwhBAzE#N)2y89k2<r1oOw-xS zXRm4(v<>9G>V6qpW)b@~BN>bF82H5@{uPac+Vwp7$&9y2Z6pmWHuGsKRc3N!(l;9+ z%pL}YWmMkR8JBc|{~f~2R0ejoE`N&8x$lj5ycQZ(6eEtWoMn_?40LL^WW%4e)sd@u z#aXN6F;>Cl*?hDK#;jVAoY|$qSzE()`6E}EHC(L}RQHuzQdh*xuDu}2rPfS?OqrZx+S^?Vu zT-@%8B*LGB|8cT5M*Gb#W9E<0Z>Y~?9YNzsuUl6(@);(H6Cpg4C_?Gz6fC8y^5Bz?Ih5ICwA?s10PpOm~qcJQ^gHG95kY10Pr1l9^ z(gA$EKz01dT_{R@?Ofn}EHVINf}13`i2!OvoH3Ffpc2uJJXvU<`y(NuncN%^fZ_qA z^;giyu_!1TXhBvD{G$pCvDmUobq#j?AQZNfrRh_cTbtjhP@tewYQoDb3zy3_4AY>g zoY|?(d?btphZiLOroRGe$K0wgc!5 zCSX`}K|EplbLmG~qVI_nX;&$k;z~~N=kTQ+7dZh*0xDU%l~>_JkFn`C zM{UMrn`l_O2u`Y3aH2gi%?-?pSQ}g)^=C`%TO2<$E>o1l!9l;}OJ^pUM)^!@(~o9A zb*2w!l8eVjmnr(xVa-rJQJ6#iu+Dv#d_xl5S5{1|xad+D{3(L!TbKInk$x9Sdp+uB znwjZQJ!6v(yZzYX=kQK`I>Y2UopnEph)dgjKYcuJND8QmVSrbp>mrd<#^@Dcaq771 zLq}nb@^v5h6>?!yn%|zn>TDa7HKj)S(ju9AHqOyi6qjZS*WR#mu0Ne@P$QiZs$PhA zfV97x1W`hMj&a@;STa+aDRXf5%p@VJbdEcP{F-&UnyHUwn()k% t{@ifZ?UR1n@ETkFFcn{S!Q=;-Gr4$H_tPaWtpSXb{|BeAY`mFX005kfL~Z~8 literal 0 HcmV?d00001 diff --git a/logs/infra-server.log.2025-06-13.0.gz b/logs/infra-server.log.2025-06-13.0.gz new file mode 100644 index 0000000000000000000000000000000000000000..605b537d38c07c320ffe78d6fff5479bb3f83306 GIT binary patch literal 6993 zcmZvfWmJ@3)b;_TOX&t_7?kdk7;*sVlA)U+qy`X>j-k7x8(}0>7(zNEhYl%`Qo8dU z|L?or^?Z2lZ}(YypL5o}uWR4G%?QIm`|o-D`%%MfS;c=~T=2EbcW`tBdH|Yl%ptEp zTvvumBGyG5(@1t$ztQI*rU5Z?$1td6>$%rTPS6beaU;EI|yc6;h zGMJuks!L39^^jGG6qTut?w@V*^WfO?8)tok_+*y^G7X)yz|o&`JT^PkXLD1$V?u%K z^ntf0lB&)6%J+TQcZ!~N3xHu#joi^?rnE85M54WY4geie-Kgm~LL@wcw%N5$h$S3wllFfQ<; zYgz=6ENtB4CQQ^j^(lvr{6=qwP)ADsbt4l&ijWVIGow(<_CnED>scvn6!0yvw`a)4~%5oPTxl2Inz&-(UOPvxwBS zy9tezx#ay#Bkl(tw}1>Y&Pl+k`Y(y0kq+_G^~;323S;!R6wocF?uWYNg|m7u24&ba zvWJqlpgkb)rpAzCwOulZmkocsM^%6?o{MOOZ$;YdV5;V2-@;%^`YjChCbZuHBbad` zrY)-+{^EM&m`rxS^@@aqH5NYG7l`cxds4uPq^e7PFBL+;G9sZ8bki6$q@S{3?5cT2 zg1x=|U8u|Q#wVfOB1Jt|C0Ek$By8yEz|qO@%0(HK0&F8$R3ackfest&a~a6x{3T!M zSX}KlKZ3}QucuUhI5*NEp$FG*JpS-bQBCUW-Y)47eQN)hdV%J$V`zbj?qx*{OZ1xK z*DTHTEg&ADa@?w-Oc1wh$kp0U8KU-@F#JD(OtRds#N8ESKiu?bA))BLp%Zql@Aa3d z9w0GH`C1Lg96(muuN=nl;)9?8U_i^M@F+=;~|C(E}1BKqn{UqrB-|eR=W@b zW0LH5={o{S7=A4{_X8x;#AD#6(jl2C;X`iuT3>(ra&$(CRti#|MIY83_>qOIoFe#K``MM&K;0k77RUHs z$41nS`Fja23i?V38$H3X%!$%)nIO|KPt|PIRawdF7X7!EGZ%na` zB3J|a$LhrG4w1`_d_DuNb+WAZ&WHR@jS<@1>yGXW zT?)bh^7)%Wrj#F%A9kl$nml!Ys?5^NogZYs2|D0i)O=yC-N89H-NkdgA|KAUMF2qz zWx5Vet^TB>j}8ZxHS%28zgXtOKf6iiv106LWR5fk@oqSHXy#8?k|mJ5t2GpAnc{Jl zWpe;e;ft0fW^$CSt70_pDY~VI%cd{8WT)TxJd=N1Iv1r{)=mo|6u9LAB6pTQVn^5E z4>;6v0pTU5qMQ5tl#wz58x0F6jnz zr@;3#0T!RzUE~t4*f45@P8h}T-@4NQk-DPSM;oLUYw?@I5r>aD!`%3xXx7T^n>Gv) z#;c9~x_e?by7@=w6uQG=XFT>Y1=!RDK%JL*zbvdktkgK`IA7P zwS@2FD9**{EsZ#B#cRO+jkWJyBtuiZ&G`4zyWICiw~-gZq|lPYU)R%9n{huu^T8i3 z4(sU)>Y#B%KT?;SR2Q5--dueTn>%p%(?(2ZypSq8^eZux(7c(w&NlX77!BgOd%v(e zE`2YBa}O^9m0O6L;6VJDqG%7<--}C`FG^p?oLJJuHWn@JjIp$dC+Kp1yConxHO1K` z8vmx5mSrH{I&|}BCB2}f7lY!EcqzYsavmMHn;hMN36evD$l$KHkyeu-amg?pE%|4`x}?U z=)*Nb;;wPV%qd_2upkd!(+n$k?QeJPM^=&II?}=|-Kv80TdATYY3{KxFO&3b$B$zo ziGJbr-H~6(O?S{<^kQ{tw&7w&_h(9XyCA){dK^9Qnpuiio@Yi$l?=pxCEjLdE)kHi z!o_=p~FF8--~$c@ItPZ7o&S)y*qe1 zE>V*w7WZ}^GDrF@XR4vNweyRSiBVH+sdL8o;uWOWGASLzh58yr9y!=Qv2~2yiKjM_ zg2K=E-Pr^v`HNGZAH9PBN|~CeK3){ML$>EHSZPE`YS$2IddYVDNncwT_6J~H%AU~h zVucen*P;*SiOR#9+4v@#t9A>K_ZlGrpL4Mjr5GP15qK-cYabQyr*`Th zja--B`)@_}=1w>(rR?9@R7{^JyAw9y+7FRQM2pgA&_)xPFor!(5uM6jSH0T1CD%wi zH3Q|^&JH1y*e~Zk_@>;_2xK>{@n^%1pX^XZ?ZDmhY%20=f(`P&T+gIL7RIOVTOpH+ zg%-iOh*Qq+$(|V%>TFT{e($`AMLivM9qG56dCA%}dxxnxT5y_{QD9cviH-f}$2-)(m?i2X5OcrIr_*#3d z&zR4)wV##4!;Tc*g{C^(@XL<0BmTm)Y^-UX)^hU+6waj%#XM7Lpq!f5_$F1;Ma zZ1T*iyXWT0n<9WfTIid~3H8JK8^)~wYlqHjGaAs`2BU3o^&G2M?M3TuUx`0Dd+ssy zID;TPR@+-aDNL~+#E7&68RGSt%sy^HuKUtnJf`@r+s5(E+wJkk3*UmrheVxG-n+Xt z+4m3kozpV>FKwzMWth-n*kM2483m3 za*HZ{tER6MxyKoOpgfv+VfHmza}7yYy|pS8ni<>SQ+w=FMo`Ev$8qpU1AX2lOPZ{Z zWYbfgqP6%u8QUY}(Y+Q;fKbQd-rA86#$2n}q!Rn&o&~9dcV>1T=;U()JKA7f&FPS% zgLx9Zta_7FNLv>!sw-q8#w}I;Lib+LKW%(e{W#RMlp_!bR4y-?)4U-OB1o09(uSl` zaZt`JAyk(<eTh|Y1OkHP4GTeUW6>953vkH;u z5nkFQ`_7h%I6X?vdVOwS_}uaaO}xm>h}Jy`B2vvL5GA3L_S`Iuj_b=M=Ri}%<1*%P zjmGs*=$(b|5wCLXJpj~B9!hnfpRx~Lk~Z}51?4YAV-^1x3n)q+}+?(YR6r1AlX*R%vRtN27`u4o0s=# z89RiYz^0lsVviV-GvPa+G#~jVu;O|Dbf))=X}i(Bei8px#Ye zKEO(kFb)=q>lN4#Ow`~BvvpV=Ru{%gpxorz{n3k14pOlK+tj{pv0leW9U`R|`^Hac zm=|_R|8(cf-&$^R6dcofVx>{U3%AtXGfjMS1+F^i8bqlERP3ZH2|Arsbxg5$wi}fO zdEp*X2RiUwy;53%Tg~;P_BCCdw<=E&*n3h<0E*|-RD<|aI();LcE0NFaK>o|z5Vbr z+{J}^_KT>BFpX+iazd=AE!Tm*@g(HAXuwp$%8t$x_`mLBx|(lzL#^3}Z>HOMiI8n<}CpIe_&*po8aTrw(bDZE%YWiZ@sfZSTG z!`oqdKycq#b?yo5DBQdT$bNCI4t3}<9tl_yxZTvCp(Uj9uVN|FriV1|T0IZ;mZ(Xc z_yU1X9Yn=H?8*OX&k=0!%a}zrXB_LFt5*FP5fFd`m?f|~w=9p6*-Q|D3V>f(EN;Mn zBI&IBNXQZm?6Wu~(zC8!I4j@N?D7ec)B~?!px0e$UuHVP{x%Aky`jC+bRH>TX}So zb-)BpskU^XyPn=6VUE-J-_P`7s?T+_e9@8nPR@MNGY)BqiOrB>No*wYN~5NU{*%TX z2B;He;qUZpWcF4#8V!k$MF+s5Yi`n|JKXP?!T|Bc{rE9h-CdT9nFXHjB)z?y7p5l3 z0;fY3TG&bFNLJF(7vJvJw=s8}b!XBBE^JPRp!V2q@vU>cSnzme;lDnV!XaErBR$@T z%Xe?lbvDf(+2f+6%wgeA(_F>3aMUc!7wD_)iViBkKI9XRo5M_b2V90%>iD3q0|Y+U|&%7T7IKSDZzinmhTY>rYbGnIhG9QBnVL0(_8G>B8Vd z+=?bOb`{MNuh+b&pw6LU{e5hZ7Uq20Pc{~k?6YSQPm$Xmxp73A@7@aYC{>z$*nRO9 zXqWm|8dYp^_AsvOpGAKl-_P3guxh)`n$YMBFuN$TWGDwV-u4)DopB2kuDM|$6-0l2 z3f+wn*jZ^53mT-(>Uh&{@hDqhaU;(}sz!l2m|M+FC4ps;0r9Onw)R2 z)({QZZN6znpV$yG?Ig9hk^Hbr=x+xE1QW&DR=?jQZ^ZF^3OCY|zh*I&s0zP;0$EmR z^~%p%@N#t#%-b}ko8;ni-m-{dIl~ttr`(WU-0*={45^gv=oIcDQls3Ae%$|wiuzOt z5R8aa5wxWGHv;|B-{zNTkuvHqT#jVBHV4$|pD7S8FUz|$I}A8ygNs9%L4j@hsVJl& zP61(6l<>VlaU2S)?iE1IuN;WtSsJ**uL^t@k(WW+P#{fbaVCxiWP=jgXVweQ39B@@ zPA!!biS7d6rJw^+(3_iP?Q=PfV!H?b#JPAl+JqRAT^#m4Xa-*}l7O|p|2A49$96uo zqFRX>#RO|#y-VdCWoVm3-6L=5=Qo4IF>)wyeTYUY$0yNRJj9TW3_LkFAxQq&__pu2 z=25KQv_z^dhG`FgLR(;g7by%*#)3Pz#D4WeSUr~sYd9o=S8^r1^$bsOnzY4IFU;E@ z_l)4lsPQYSdJ|)C{e;+8qd39~))z8eyFm_n{SqtWBF2y|A;I)07J+LSXDB3KkXL;O zJ+))$v6)2F_XDQp>1G^BmXeNQl_IB-bI&IuBMeKYKG7dOe>NP|<1lCVUqv8%VhW4I z;AEUP$v>>{Ima1TNN9xv>DI{|mt%*Wlv%&%mt+wT)p~~*maZ>KG|pC2BAE5pIYI6( zpY#C^bf}xU0VxCI&nU zfXqHCh4um{FtVlgq2B>I+U1ifdW2=;R2blT^<=Yka(s>WS%k!_7b$qM(wz2gW5`cK;@U zf1NhUmtI%2R!1;jwjP8TmTu6M0=5Q#_#PPR^{|0om~Yn}+(Y(-YCsvN9vAuCv0rL* zSWupMG#oWy|Lc0}Q*o<0OG$QEVIdVNjb=h|DuR)Y1`nKW2~(goa$p8w?et#bkGl6= ziZGr{BKTK5!=KQl7>P(|pSuExD#s36Kn<8iF zhZz5>Hezv|&RPbEswcP%5_zK!kF!m{Ml;-`yQFIFw#E z7p2&4fDbjBCt}0dQ4XkYlQ0BycX@E&Q_N>L7QXi!F2+hPxSYtQOY{Mzt{|*6w(PDo zN^JoMs~4}Esa;ZjEm$+v%O|@zR`Jiea0EPAyqk*!&yF77&_BnfShj|j3eh2KSB*&w?#Xdz?>oY}} zCcOWb2nY3*`=^uc*2pt4vc1T!y8w2@WYmhndnGsxD`tZ}dmVPnB@2EjNGvwBe4XnZ zjBRyy6t39TK{acQ%1<@|7F);%Mxp~V<~~QScCZEA?3Bl6(|o^R0<$Q|vrPwCnDS5}6?#qtZpL|MoGTm#A~r?I?tKXz{Q4k+(7A z^OX0&=e`@*i_yw-KZyYY*slBdFuYM9B&hw9#fv!j9|-;7;>w`dFUxsX62%|yrH9~B z_ka(_y28FL%s1L@Ew^(7M=Jx}q-r9>a#WjYk`v8)c1Vi$c34QQg#iyJ5^BWkJ1VB> z0Jgr%(5}N*@ciN0PYJ1NWbHO98k!XjNOvdRbD?Ol&KM9!`8Hi5*O{k6l!InT z*Zz9eTWjwVF!G%yCf1~U9uw_fW@$w}!z?~So|6!lceVg07F&dHwi4y-auV@LAF+XnV^Z76iIFS(TnK? zP>I>XhvkO8!+eR{zC#BC$1ifQD@<=6j;cnAYdXIL?vfP=k?6uPye@pfE_FM{KCVNr z{fVhi2|(PBu|ZpAF-9edX=BSrqCfCqTYqTO#df~L{-B!NM}2RIiwWtR0MOSqTIA3p zn!U2IJ2Urv)!BSJDq(&3VoZNT*+t^77|!0OBb&a|NMpMZU#W3aSWnTIT|)g!J4v!O zRQW3Dv0(B`rDHx6TOyT>#z#ajq;2~f`-O;AdT#@^ZLm{iV<9lC-jtgDpEVWsc=V0X z*S#v-Z6P5wCz1@K@6C0b?iMu2>eaxwC&~6LYsz_ZD(R7{5)AdGSI1yyqIP=ubDK7bPK?^ z=Tlj$-;p~eQ^z2_KS^|A%rc7sHB73@v&hZ3yHWW0YQ#(Q(K|~R850|saxyWh8Fq@G znS8Pn+{OrePkC0aAcH&Z*kYZ)>*Wp`6puK1XMjzJ+u}DJ?=yXhGwS+WR(WSr>PE?a zR7`;pIgxw*Br0BJb~;cWn4xU~(DPDQrv>Mtxcq3bm?TbyyGR<55nlV^8)9tvm5b(j zbDpycA$;rY-2Ga!VqSut`nOIobd^OQPR#2spI?8O5JC>$t}Eu?_CyO=7ue zMb7EOrze&;FKze|!P$;!Ra(>n2o`&J*P7dK;AL3aE@kFnIMCX)-p?DoZ`7U37YoWT z5e8J*ACm*#L-C=tpM%FQzWjRlBg+b{-2gN&uQi7?e_JUSMFsuiq`wRSj2K`*-qPYX z|7`zN$bejza|8zuv27m=Y<+prMRAPj~4V{>pW zwp%@JhbI(*BBiaBgSF-yt0iIV=_(2Zd-t^{UFb`FhiXkXyf0(b#UvjE_8@btCW9hW zE=ii2%-a;$kxrM#=YHo5GsbmCl;^HBSL=6*6;cnrp^bTdwn)3%{79@=5J$TAh>rGu E00Y;OP5=M^ literal 0 HcmV?d00001 diff --git a/tashow-framework/pom.xml b/tashow-framework/pom.xml index 0e47e10..06d6854 100644 --- a/tashow-framework/pom.xml +++ b/tashow-framework/pom.xml @@ -27,6 +27,7 @@ tashow-data-redis tashow-data-excel tashow-data-es + tashow-data-canal diff --git a/tashow-framework/tashow-data-canal/pom.xml b/tashow-framework/tashow-data-canal/pom.xml new file mode 100644 index 0000000..f3daf9d --- /dev/null +++ b/tashow-framework/tashow-data-canal/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + com.tashow.cloud + tashow-framework + ${revision} + + tashow-data-canal + jar + + ${project.artifactId} + canal 封装拓展 + + + + com.alibaba.otter + canal.client + 1.1.0 + + + + + com.baomidou + dynamic-datasource-spring-boot-starter + 4.2.0 + + + + javax.annotation + javax.annotation-api + 1.3.2 + + + + com.tashow.cloud + tashow-common + + + + + org.projectlombok + lombok + true + + + diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/config/CanalAutoConfiguration.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/config/CanalAutoConfiguration.java new file mode 100644 index 0000000..038eedd --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/config/CanalAutoConfiguration.java @@ -0,0 +1,22 @@ +package com.tashow.cloud.canal.config; + +import com.tashow.cloud.canal.service.CanalSyncService; +import com.tashow.cloud.canal.service.SqlExecutorService; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@AutoConfiguration +public class CanalAutoConfiguration { + + @Bean + public CanalSyncService canalSyncService() { + return new CanalSyncService(); + } + + @Bean + public SqlExecutorService getdb() { + return new SqlExecutorService(); + } +} diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncService.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncService.java new file mode 100644 index 0000000..64c6a87 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncService.java @@ -0,0 +1,188 @@ +package com.tashow.cloud.canal.service; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry.*; +import com.alibaba.otter.canal.protocol.Message; +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Service +public class CanalSyncService { + + private static final Logger log = LoggerFactory.getLogger(CanalSyncService.class); + + private static final Queue SQL_QUEUE = new ConcurrentLinkedQueue<>(); + + @Autowired + private JdbcTemplate jdbcTemplate; + @Autowired + private SqlExecutorService sqlExecutorService; + + @PostConstruct + public void start() { + new Thread(this::runCanalClient).start(); + } + + private void runCanalClient() { + CanalConnector connector = CanalConnectors.newSingleConnector( + new InetSocketAddress("43.139.42.137", 11111), + "example", + "", + "" + ); + int batchSize = 1000; + + try { + connector.connect(); + connector.subscribe("tashow-platform\\..*"); + connector.rollback(); + + while (true) { + Message message = connector.getWithoutAck(batchSize); + log.info("Received message id: {}, entries size: {}", message.getId(), message.getEntries().size()); + long batchId = message.getId(); + int size = message.getEntries().size(); + + if (batchId == -1 || size == 0) { + Thread.sleep(1000); + continue; + } + + dataHandle(message.getEntries()); + connector.ack(batchId); + + if (!SQL_QUEUE.isEmpty()) { + executeQueueSql(); + } + } + } catch (Exception e) { + log.error("Canal client error occurred.", e); + } finally { + connector.disconnect(); + } + } + + private void dataHandle(List entries) { + for (Entry entry : entries) { + if (entry.getEntryType() != EntryType.ROWDATA) continue; + + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + EventType eventType = rowChange.getEventType(); + String schemaName = entry.getHeader().getSchemaName(); + String tableName = entry.getHeader().getTableName(); + + log.info("schema: {}, table: {}, type: {}", schemaName, tableName, eventType); + + if (eventType == EventType.DELETE) { + saveDeleteSql(entry); + } else if (eventType == EventType.UPDATE) { + saveUpdateSql(entry); + } else if (eventType == EventType.INSERT) { + saveInsertSql(entry); + } + + } catch (Exception e) { + log.error("Error handling entry: {}", entry.toString(), e); + } + } + } + + private void saveInsertSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + String tableName = entry.getHeader().getTableName(); + + for (RowData rowData : rowChange.getRowDatasList()) { + List columns = rowData.getAfterColumnsList(); + + List columnNames = new ArrayList<>(); + List values = new ArrayList<>(); + + for (Column col : columns) { + columnNames.add(col.getName()); + values.add(col.getValue()); + } + + String sql = "INSERT INTO " + tableName + " (" + + String.join(",", columnNames) + ") VALUES ("; + + StringBuilder placeholders = new StringBuilder(); + for (int i = 0; i < values.size(); i++) { + placeholders.append("?,"); + } + if (placeholders.length() > 0) placeholders.deleteCharAt(placeholders.length() - 1); + sql += placeholders + ")"; + + SQL_QUEUE.add(new SqlTask(sql, values.toArray())); + } + } + + private void saveUpdateSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + String tableName = entry.getHeader().getTableName(); + + for (RowData rowData : rowChange.getRowDatasList()) { + List newColumns = rowData.getAfterColumnsList(); + List oldColumns = rowData.getBeforeColumnsList(); + + List updateColumns = new ArrayList<>(); + List params = new ArrayList<>(); + + for (Column col : newColumns) { + updateColumns.add(col.getName() + "=?"); + params.add(col.getValue()); + } + + Optional primaryKeyOpt = oldColumns.stream().filter(Column::getIsKey).findFirst(); + Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键")); + + params.add(primaryKey.getValue()); + + String sql = "UPDATE " + tableName + " SET " + + String.join(",", updateColumns) + + " WHERE " + primaryKey.getName() + "=?"; + + SQL_QUEUE.add(new SqlTask(sql, params.toArray())); + } + } + + private void saveDeleteSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + String tableName = entry.getHeader().getTableName(); + + for (RowData rowData : rowChange.getRowDatasList()) { + List beforeColumns = rowData.getBeforeColumnsList(); + + Optional primaryKeyOpt = beforeColumns.stream().filter(Column::getIsKey).findFirst(); + Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键")); + + String sql = "DELETE FROM " + tableName + " WHERE " + primaryKey.getName() + "=?"; + SQL_QUEUE.add(new SqlTask(sql, primaryKey.getValue())); + } + } + + private void executeQueueSql() { + List tasks = new ArrayList<>(); + SqlTask task; + while ((task = SQL_QUEUE.poll()) != null) { + tasks.add(task); + } + if (!tasks.isEmpty()) { + sqlExecutorService.executeBatch(tasks); + } + } +} \ No newline at end of file diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncServiceTest.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncServiceTest.java new file mode 100644 index 0000000..9a526da --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncServiceTest.java @@ -0,0 +1,156 @@ +/* +package com.tashow.cloud.canal.service; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry.*; +import com.alibaba.otter.canal.protocol.Message; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Service +public class CanalSyncServiceTest { + + private static final Queue SQL_QUEUE = new ConcurrentLinkedQueue<>(); + @Autowired + private JdbcTemplate jdbcTemplate; + @Autowired + private Canaldb canaldb; + + @PostConstruct + public void start() { + new Thread(this::runCanalClient).start(); + } + + private void runCanalClient() { + CanalConnector connector = CanalConnectors.newSingleConnector( + new InetSocketAddress("43.139.42.137", 11111), + "example", + "", + "" + ); + int batchSize = 1000; + + try { + connector.connect(); + connector.subscribe("tashow-platform\\..*"); + connector.rollback(); + + while (true) { + Message message = connector.getWithoutAck(batchSize); + System.out.println("Received message id: " + message.getId() + ", entries size: " + message.getEntries().size()); + long batchId = message.getId(); + int size = message.getEntries().size(); + + if (batchId == -1 || size == 0) { + Thread.sleep(1000); + continue; + } + + dataHandle(message.getEntries()); + connector.ack(batchId); + + if (SQL_QUEUE.size() > 0) { + executeQueueSql(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + connector.disconnect(); + } + } + + private void dataHandle(List entries) { + for (Entry entry : entries) { + if (entry.getEntryType() != EntryType.ROWDATA) continue; + + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + EventType eventType = rowChange.getEventType(); + String schemaName = entry.getHeader().getSchemaName(); + String tableName = entry.getHeader().getTableName(); + + System.out.println("schema: " + schemaName + ", table: " + tableName + ", type: " + eventType); + + if (eventType == EventType.DELETE) { + saveDeleteSql(entry); + } else if (eventType == EventType.UPDATE) { + saveUpdateSql(entry); + } else if (eventType == EventType.INSERT) { + saveInsertSql(entry); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private void saveInsertSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + for (RowData rowData : rowChange.getRowDatasList()) { + List columns = rowData.getAfterColumnsList(); + StringBuilder sql = new StringBuilder("INSERT INTO ") + .append(entry.getHeader().getTableName()).append(" (") + .append(columns.stream().map(Column::getName).reduce((a, b) -> a + "," + b).orElse("")) + .append(") VALUES (") + .append(columns.stream().map(c -> "'" + c.getValue() + "'").reduce((a, b) -> a + "," + b).orElse("")) + .append(");"); + + SQL_QUEUE.add(sql.toString()); + } + } + + private void saveUpdateSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + for (RowData rowData : rowChange.getRowDatasList()) { + List newColumns = rowData.getAfterColumnsList(); + List oldColumns = rowData.getBeforeColumnsList(); + + StringBuilder setClause = new StringBuilder(); + for (Column col : newColumns) { + setClause.append(col.getName()).append("='").append(col.getValue()).append("', "); + } + if (setClause.length() > 0) setClause.setLength(setClause.length() - 2); + + String whereClause = oldColumns.stream() + .filter(Column::getIsKey) + .map(c -> c.getName() + "='" + c.getValue() + "'") + .findFirst() + .orElseThrow(() -> new RuntimeException("未找到主键")); + + SQL_QUEUE.add("UPDATE " + entry.getHeader().getTableName() + " SET " + setClause + " WHERE " + whereClause + ";"); + } + } + + private void saveDeleteSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + for (RowData rowData : rowChange.getRowDatasList()) { + String whereClause = rowData.getBeforeColumnsList().stream() + .filter(Column::getIsKey) + .map(c -> c.getName() + "='" + c.getValue() + "'") + .findFirst() + .orElseThrow(() -> new RuntimeException("未找到主键")); + + SQL_QUEUE.add("DELETE FROM " + entry.getHeader().getTableName() + " WHERE " + whereClause + ";"); + } + } + + + private void executeQueueSql() { + int size = SQL_QUEUE.size(); + for (int i = 0; i < size; i++) { + String sql = SQL_QUEUE.poll(); + canaldb.execute(sql); + } + } +} +*/ diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/Canaldb.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/Canaldb.java new file mode 100644 index 0000000..7b51600 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/Canaldb.java @@ -0,0 +1,27 @@ +/* +package com.tashow.cloud.canal.service; + +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +@Service +public class Canaldb { + + @Autowired + private JdbcTemplate jdbcTemplate; + @DS("slave") + public void execute(String sql) { + try { + String ds = DynamicDataSourceContextHolder.peek(); // 调试查看当前数据源 + System.out.println("当前数据源:" + ds); + System.out.println("[execute]----> " + sql); + jdbcTemplate.execute(sql); + } catch (Exception e) { + e.printStackTrace(); + } + } +} +*/ diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlExecutorService.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlExecutorService.java new file mode 100644 index 0000000..3ba0286 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlExecutorService.java @@ -0,0 +1,56 @@ +package com.tashow.cloud.canal.service; + + +import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +@Service +public class SqlExecutorService { + + private static final Logger log = LoggerFactory.getLogger(SqlExecutorService.class); + + @Autowired + private JdbcTemplate jdbcTemplate; + + public void executeBatch(List tasks) { + if (tasks == null || tasks.isEmpty()) return; + + DynamicDataSourceContextHolder.push("slave"); + try { + // 提取所有 SQL 模板(假设它们都是一样的) + String sqlTemplate = tasks.get(0).getSql(); + + // 执行批量更新 + jdbcTemplate.batchUpdate(sqlTemplate, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + SqlTask task = tasks.get(i); + Object[] args = task.getArgs(); + for (int j = 0; j < args.length; j++) { + ps.setObject(j + 1, args[j]); + } + } + + @Override + public int getBatchSize() { + return tasks.size(); + } + }); + + log.info("✅ 成功执行 {} 条 SQL", tasks.size()); + } catch (Exception e) { + log.error("❌ 批量执行 SQL 失败", e); + } finally { + DynamicDataSourceContextHolder.poll(); + } + } +} \ No newline at end of file diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTask.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTask.java new file mode 100644 index 0000000..c6f62c2 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTask.java @@ -0,0 +1,19 @@ +package com.tashow.cloud.canal.service; + +public class SqlTask { + private final String sql; + private final Object[] args; + + public SqlTask(String sql, Object... args) { + this.sql = sql; + this.args = args; + } + + public String getSql() { + return sql; + } + + public Object[] getArgs() { + return args; + } +} \ No newline at end of file diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTaskQueue.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTaskQueue.java new file mode 100644 index 0000000..edf2489 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTaskQueue.java @@ -0,0 +1,32 @@ +package com.tashow.cloud.canal.service; + +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class SqlTaskQueue { + private static final Queue queue = new ConcurrentLinkedQueue<>(); + + public static void add(SqlTask task) { + queue.add(task); + } + + public static boolean isEmpty() { + return queue.isEmpty(); + } + + public static SqlTask poll() { + return queue.poll(); + } + + public static int size() { + return queue.size(); + } + + public static List drainAll() { + List list = new java.util.ArrayList<>(); + queue.forEach(list::add); + queue.clear(); + return list; + } +} diff --git a/tashow-framework/tashow-data-canal/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/tashow-framework/tashow-data-canal/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..361a8ce --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.tashow.cloud.canal.config.CanalAutoConfiguration \ No newline at end of file diff --git a/tashow-module/tashow-module-infra/pom.xml b/tashow-module/tashow-module-infra/pom.xml index 9d6c752..bf2006d 100644 --- a/tashow-module/tashow-module-infra/pom.xml +++ b/tashow-module/tashow-module-infra/pom.xml @@ -37,6 +37,14 @@ ${revision} + + + com.alibaba.otter + canal.client + 1.1.0 + + + com.tashow.cloud @@ -69,6 +77,13 @@ tashow-data-redis + + com.tashow.cloud + tashow-data-canal + 1.0.0 + compile + + com.tashow.cloud diff --git a/tashow-module/tashow-module-infra/src/main/java/com/tashow/cloud/infra/framework/CanalClient.java b/tashow-module/tashow-module-infra/src/main/java/com/tashow/cloud/infra/framework/CanalClient.java new file mode 100644 index 0000000..95a6876 --- /dev/null +++ b/tashow-module/tashow-module-infra/src/main/java/com/tashow/cloud/infra/framework/CanalClient.java @@ -0,0 +1,204 @@ +package com.tashow.cloud.infra.framework; + + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry.*; +import com.alibaba.otter.canal.protocol.Message; +import com.google.protobuf.InvalidProtocolBufferException; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Component +public class CanalClient { + + //sql队列 + private Queue SQL_QUEUE = new ConcurrentLinkedQueue<>(); + + + /** + * canal入库方法 + */ + public void run() { + CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("43.139.42.137", + 11111), "example", "", ""); + int batchSize = 1000; + try { + connector.connect(); + // connector.subscribe(".*\\..*"); + connector.subscribe("tashow-platform"); + + connector.rollback(); + try { + while (true) { + //尝试从master那边拉去数据batchSize条记录,有多少取多少 + Message message = connector.getWithoutAck(batchSize); + long batchId = message.getId(); + int size = message.getEntries().size(); + if (batchId == -1 || size == 0) { + Thread.sleep(1000); + } else { + dataHandle(message.getEntries()); + } + connector.ack(batchId); + + //当队列里面堆积的sql大于一定数值的时候就模拟执行 + if (SQL_QUEUE.size() >= 1) { + executeQueueSql(); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } finally { + connector.disconnect(); + } + } + + /** + * 模拟执行队列里面的sql语句 + */ + public void executeQueueSql() { + int size = SQL_QUEUE.size(); + for (int i = 0; i < size; i++) { + String sql = SQL_QUEUE.poll(); + System.out.println("[sql]----> " + sql); + + this.execute(sql); + } + } + + /** + * 数据处理 + * + * @param entrys + */ + private void dataHandle(List entrys) throws InvalidProtocolBufferException { + for (Entry entry : entrys) { + if(entry.getHeader().getSchemaName().equals("hc")){ + return; + } + if (EntryType.ROWDATA == entry.getEntryType()) { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + EventType eventType = rowChange.getEventType(); + if (eventType == EventType.DELETE) { + saveDeleteSql(entry); + } else if (eventType == EventType.UPDATE) { + saveUpdateSql(entry); + } else if (eventType == EventType.INSERT) { + saveInsertSql(entry); + } + } + } + } + + /** + * 保存更新语句 + * + * @param entry + */ + private void saveUpdateSql(Entry entry) { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + List rowDatasList = rowChange.getRowDatasList(); + for (RowData rowData : rowDatasList) { + List newColumnList = rowData.getAfterColumnsList(); + StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); + for (int i = 0; i < newColumnList.size(); i++) { + sql.append(" " + newColumnList.get(i).getName() + + " = '" + newColumnList.get(i).getValue() + "'"); + if (i != newColumnList.size() - 1) { + sql.append(","); + } + } + sql.append(" where "); + List oldColumnList = rowData.getBeforeColumnsList(); + for (Column column : oldColumnList) { + if (column.getIsKey()) { + //暂时只支持单一主键 + sql.append(column.getName() + "=" + column.getValue()); + break; + } + } + SQL_QUEUE.add(sql.toString()); + } + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + + /** + * 保存删除语句 + * + * @param entry + */ + private void saveDeleteSql(Entry entry) { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + List rowDatasList = rowChange.getRowDatasList(); + for (RowData rowData : rowDatasList) { + List columnList = rowData.getBeforeColumnsList(); + StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); + for (Column column : columnList) { + if (column.getIsKey()) { + //暂时只支持单一主键 + sql.append(column.getName() + "=" + column.getValue()); + break; + } + } + SQL_QUEUE.add(sql.toString()); + } + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + + /** + * 保存插入语句 + * + * @param entry + */ + private void saveInsertSql(Entry entry) { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + List rowDatasList = rowChange.getRowDatasList(); + for (RowData rowData : rowDatasList) { + List columnList = rowData.getAfterColumnsList(); + StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); + for (int i = 0; i < columnList.size(); i++) { + sql.append(columnList.get(i).getName()); + if (i != columnList.size() - 1) { + sql.append(","); + } + } + sql.append(") VALUES ("); + for (int i = 0; i < columnList.size(); i++) { + sql.append("'" + columnList.get(i).getValue() + "'"); + if (i != columnList.size() - 1) { + sql.append(","); + } + } + sql.append(")"); + SQL_QUEUE.add(sql.toString()); + } + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + + /** + * 入库 + * @param sql + */ + public void execute(String sql) { + System.out.println("sql======="+sql); + } +} diff --git a/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml b/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml index a26f4bb..15b2156 100644 --- a/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml +++ b/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml @@ -7,10 +7,11 @@ spring: username: nacos # Nacos 账号 password: nacos # Nacos 密码 discovery: # 【配置中心】配置项 - namespace: liwq # 命名空间。这里使用 dev 开发环境 + namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境 group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP metadata: version: 1.0.0 # 服务实例的版本号,可用于灰度发布 config: # 【注册中心】配置项 - namespace: liwq # 命名空间。这里使用 dev 开发环境 + namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境 group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP +