Merge branch 'xlj' into develop

This commit is contained in:
2025-07-24 09:55:11 +08:00
15 changed files with 773 additions and 2 deletions

View File

@@ -27,6 +27,7 @@
<module>tashow-data-redis</module>
<module>tashow-data-excel</module>
<module>tashow-data-es</module>
<module>tashow-data-canal</module>
</modules>

View File

@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework</artifactId>
<version>${revision}</version>
</parent>
<artifactId>tashow-data-canal</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>canal 封装拓展</description>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!-- Dynamic-Datasource Starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>4.2.0</version> <!-- 推荐使用稳定版本 -->
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version> <!-- 最新版本 -->
</dependency>
<!-- 芋道基础依赖 -->
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-common</artifactId>
</dependency>
<!-- 其他必要依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,22 @@
package com.tashow.cloud.canal.config;
import com.tashow.cloud.canal.service.CanalSyncService;
import com.tashow.cloud.canal.service.SqlExecutorService;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@AutoConfiguration
public class CanalAutoConfiguration {
@Bean
public CanalSyncService canalSyncService() {
return new CanalSyncService();
}
@Bean
public SqlExecutorService getdb() {
return new SqlExecutorService();
}
}

View File

@@ -0,0 +1,188 @@
package com.tashow.cloud.canal.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
public class CanalSyncService {
private static final Logger log = LoggerFactory.getLogger(CanalSyncService.class);
private static final Queue<SqlTask> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private SqlExecutorService sqlExecutorService;
@PostConstruct
public void start() {
new Thread(this::runCanalClient).start();
}
private void runCanalClient() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("43.139.42.137", 11111),
"example",
"",
""
);
int batchSize = 1000;
try {
connector.connect();
connector.subscribe("tashow-platform\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
log.info("Received message id: {}, entries size: {}", message.getId(), message.getEntries().size());
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
dataHandle(message.getEntries());
connector.ack(batchId);
if (!SQL_QUEUE.isEmpty()) {
executeQueueSql();
}
}
} catch (Exception e) {
log.error("Canal client error occurred.", e);
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
log.info("schema: {}, table: {}, type: {}", schemaName, tableName, eventType);
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
} catch (Exception e) {
log.error("Error handling entry: {}", entry.toString(), e);
}
}
}
private void saveInsertSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> columns = rowData.getAfterColumnsList();
List<String> columnNames = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (Column col : columns) {
columnNames.add(col.getName());
values.add(col.getValue());
}
String sql = "INSERT INTO " + tableName + " (" +
String.join(",", columnNames) + ") VALUES (";
StringBuilder placeholders = new StringBuilder();
for (int i = 0; i < values.size(); i++) {
placeholders.append("?,");
}
if (placeholders.length() > 0) placeholders.deleteCharAt(placeholders.length() - 1);
sql += placeholders + ")";
SQL_QUEUE.add(new SqlTask(sql, values.toArray()));
}
}
private void saveUpdateSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> newColumns = rowData.getAfterColumnsList();
List<Column> oldColumns = rowData.getBeforeColumnsList();
List<String> updateColumns = new ArrayList<>();
List<Object> params = new ArrayList<>();
for (Column col : newColumns) {
updateColumns.add(col.getName() + "=?");
params.add(col.getValue());
}
Optional<Column> primaryKeyOpt = oldColumns.stream().filter(Column::getIsKey).findFirst();
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
params.add(primaryKey.getValue());
String sql = "UPDATE " + tableName + " SET " +
String.join(",", updateColumns) +
" WHERE " + primaryKey.getName() + "=?";
SQL_QUEUE.add(new SqlTask(sql, params.toArray()));
}
}
private void saveDeleteSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> beforeColumns = rowData.getBeforeColumnsList();
Optional<Column> primaryKeyOpt = beforeColumns.stream().filter(Column::getIsKey).findFirst();
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
String sql = "DELETE FROM " + tableName + " WHERE " + primaryKey.getName() + "=?";
SQL_QUEUE.add(new SqlTask(sql, primaryKey.getValue()));
}
}
private void executeQueueSql() {
List<SqlTask> tasks = new ArrayList<>();
SqlTask task;
while ((task = SQL_QUEUE.poll()) != null) {
tasks.add(task);
}
if (!tasks.isEmpty()) {
sqlExecutorService.executeBatch(tasks);
}
}
}

View File

@@ -0,0 +1,156 @@
/*
package com.tashow.cloud.canal.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
public class CanalSyncServiceTest {
private static final Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private Canaldb canaldb;
@PostConstruct
public void start() {
new Thread(this::runCanalClient).start();
}
private void runCanalClient() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("43.139.42.137", 11111),
"example",
"",
""
);
int batchSize = 1000;
try {
connector.connect();
connector.subscribe("tashow-platform\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
System.out.println("Received message id: " + message.getId() + ", entries size: " + message.getEntries().size());
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
dataHandle(message.getEntries());
connector.ack(batchId);
if (SQL_QUEUE.size() > 0) {
executeQueueSql();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
System.out.println("schema: " + schemaName + ", table: " + tableName + ", type: " + eventType);
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void saveInsertSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> columns = rowData.getAfterColumnsList();
StringBuilder sql = new StringBuilder("INSERT INTO ")
.append(entry.getHeader().getTableName()).append(" (")
.append(columns.stream().map(Column::getName).reduce((a, b) -> a + "," + b).orElse(""))
.append(") VALUES (")
.append(columns.stream().map(c -> "'" + c.getValue() + "'").reduce((a, b) -> a + "," + b).orElse(""))
.append(");");
SQL_QUEUE.add(sql.toString());
}
}
private void saveUpdateSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> newColumns = rowData.getAfterColumnsList();
List<Column> oldColumns = rowData.getBeforeColumnsList();
StringBuilder setClause = new StringBuilder();
for (Column col : newColumns) {
setClause.append(col.getName()).append("='").append(col.getValue()).append("', ");
}
if (setClause.length() > 0) setClause.setLength(setClause.length() - 2);
String whereClause = oldColumns.stream()
.filter(Column::getIsKey)
.map(c -> c.getName() + "='" + c.getValue() + "'")
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到主键"));
SQL_QUEUE.add("UPDATE " + entry.getHeader().getTableName() + " SET " + setClause + " WHERE " + whereClause + ";");
}
}
private void saveDeleteSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
String whereClause = rowData.getBeforeColumnsList().stream()
.filter(Column::getIsKey)
.map(c -> c.getName() + "='" + c.getValue() + "'")
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到主键"));
SQL_QUEUE.add("DELETE FROM " + entry.getHeader().getTableName() + " WHERE " + whereClause + ";");
}
}
private void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
canaldb.execute(sql);
}
}
}
*/

View File

@@ -0,0 +1,27 @@
/*
package com.tashow.cloud.canal.service;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@Service
public class Canaldb {
@Autowired
private JdbcTemplate jdbcTemplate;
@DS("slave")
public void execute(String sql) {
try {
String ds = DynamicDataSourceContextHolder.peek(); // 调试查看当前数据源
System.out.println("当前数据源:" + ds);
System.out.println("[execute]----> " + sql);
jdbcTemplate.execute(sql);
} catch (Exception e) {
e.printStackTrace();
}
}
}
*/

View File

@@ -0,0 +1,56 @@
package com.tashow.cloud.canal.service;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
@Service
public class SqlExecutorService {
private static final Logger log = LoggerFactory.getLogger(SqlExecutorService.class);
@Autowired
private JdbcTemplate jdbcTemplate;
public void executeBatch(List<SqlTask> tasks) {
if (tasks == null || tasks.isEmpty()) return;
DynamicDataSourceContextHolder.push("slave");
try {
// 提取所有 SQL 模板(假设它们都是一样的)
String sqlTemplate = tasks.get(0).getSql();
// 执行批量更新
jdbcTemplate.batchUpdate(sqlTemplate, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
SqlTask task = tasks.get(i);
Object[] args = task.getArgs();
for (int j = 0; j < args.length; j++) {
ps.setObject(j + 1, args[j]);
}
}
@Override
public int getBatchSize() {
return tasks.size();
}
});
log.info("✅ 成功执行 {} 条 SQL", tasks.size());
} catch (Exception e) {
log.error("❌ 批量执行 SQL 失败", e);
} finally {
DynamicDataSourceContextHolder.poll();
}
}
}

View File

@@ -0,0 +1,19 @@
package com.tashow.cloud.canal.service;
public class SqlTask {
private final String sql;
private final Object[] args;
public SqlTask(String sql, Object... args) {
this.sql = sql;
this.args = args;
}
public String getSql() {
return sql;
}
public Object[] getArgs() {
return args;
}
}

View File

@@ -0,0 +1,32 @@
package com.tashow.cloud.canal.service;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class SqlTaskQueue {
private static final Queue<SqlTask> queue = new ConcurrentLinkedQueue<>();
public static void add(SqlTask task) {
queue.add(task);
}
public static boolean isEmpty() {
return queue.isEmpty();
}
public static SqlTask poll() {
return queue.poll();
}
public static int size() {
return queue.size();
}
public static List<SqlTask> drainAll() {
List<SqlTask> list = new java.util.ArrayList<>();
queue.forEach(list::add);
queue.clear();
return list;
}
}

View File

@@ -0,0 +1 @@
com.tashow.cloud.canal.config.CanalAutoConfiguration