This commit is contained in:
xuelijun
2025-07-15 10:04:16 +08:00
parent c2adf5155f
commit 10f8e8251b
15 changed files with 773 additions and 2 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -27,6 +27,7 @@
<module>tashow-data-redis</module>
<module>tashow-data-excel</module>
<module>tashow-data-es</module>
<module>tashow-data-canal</module>
</modules>

View File

@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-framework</artifactId>
<version>${revision}</version>
</parent>
<artifactId>tashow-data-canal</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>canal 封装拓展</description>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!-- Dynamic-Datasource Starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>4.2.0</version> <!-- 推荐使用稳定版本 -->
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version> <!-- 最新版本 -->
</dependency>
<!-- 芋道基础依赖 -->
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-common</artifactId>
</dependency>
<!-- 其他必要依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,22 @@
package com.tashow.cloud.canal.config;
import com.tashow.cloud.canal.service.CanalSyncService;
import com.tashow.cloud.canal.service.SqlExecutorService;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@AutoConfiguration
public class CanalAutoConfiguration {
@Bean
public CanalSyncService canalSyncService() {
return new CanalSyncService();
}
@Bean
public SqlExecutorService getdb() {
return new SqlExecutorService();
}
}

View File

@@ -0,0 +1,188 @@
package com.tashow.cloud.canal.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
public class CanalSyncService {
private static final Logger log = LoggerFactory.getLogger(CanalSyncService.class);
private static final Queue<SqlTask> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private SqlExecutorService sqlExecutorService;
@PostConstruct
public void start() {
new Thread(this::runCanalClient).start();
}
private void runCanalClient() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("43.139.42.137", 11111),
"example",
"",
""
);
int batchSize = 1000;
try {
connector.connect();
connector.subscribe("tashow-platform\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
log.info("Received message id: {}, entries size: {}", message.getId(), message.getEntries().size());
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
dataHandle(message.getEntries());
connector.ack(batchId);
if (!SQL_QUEUE.isEmpty()) {
executeQueueSql();
}
}
} catch (Exception e) {
log.error("Canal client error occurred.", e);
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
log.info("schema: {}, table: {}, type: {}", schemaName, tableName, eventType);
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
} catch (Exception e) {
log.error("Error handling entry: {}", entry.toString(), e);
}
}
}
private void saveInsertSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> columns = rowData.getAfterColumnsList();
List<String> columnNames = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (Column col : columns) {
columnNames.add(col.getName());
values.add(col.getValue());
}
String sql = "INSERT INTO " + tableName + " (" +
String.join(",", columnNames) + ") VALUES (";
StringBuilder placeholders = new StringBuilder();
for (int i = 0; i < values.size(); i++) {
placeholders.append("?,");
}
if (placeholders.length() > 0) placeholders.deleteCharAt(placeholders.length() - 1);
sql += placeholders + ")";
SQL_QUEUE.add(new SqlTask(sql, values.toArray()));
}
}
private void saveUpdateSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> newColumns = rowData.getAfterColumnsList();
List<Column> oldColumns = rowData.getBeforeColumnsList();
List<String> updateColumns = new ArrayList<>();
List<Object> params = new ArrayList<>();
for (Column col : newColumns) {
updateColumns.add(col.getName() + "=?");
params.add(col.getValue());
}
Optional<Column> primaryKeyOpt = oldColumns.stream().filter(Column::getIsKey).findFirst();
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
params.add(primaryKey.getValue());
String sql = "UPDATE " + tableName + " SET " +
String.join(",", updateColumns) +
" WHERE " + primaryKey.getName() + "=?";
SQL_QUEUE.add(new SqlTask(sql, params.toArray()));
}
}
private void saveDeleteSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> beforeColumns = rowData.getBeforeColumnsList();
Optional<Column> primaryKeyOpt = beforeColumns.stream().filter(Column::getIsKey).findFirst();
Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键"));
String sql = "DELETE FROM " + tableName + " WHERE " + primaryKey.getName() + "=?";
SQL_QUEUE.add(new SqlTask(sql, primaryKey.getValue()));
}
}
private void executeQueueSql() {
List<SqlTask> tasks = new ArrayList<>();
SqlTask task;
while ((task = SQL_QUEUE.poll()) != null) {
tasks.add(task);
}
if (!tasks.isEmpty()) {
sqlExecutorService.executeBatch(tasks);
}
}
}

View File

@@ -0,0 +1,156 @@
/*
package com.tashow.cloud.canal.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
public class CanalSyncServiceTest {
private static final Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private Canaldb canaldb;
@PostConstruct
public void start() {
new Thread(this::runCanalClient).start();
}
private void runCanalClient() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("43.139.42.137", 11111),
"example",
"",
""
);
int batchSize = 1000;
try {
connector.connect();
connector.subscribe("tashow-platform\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
System.out.println("Received message id: " + message.getId() + ", entries size: " + message.getEntries().size());
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}
dataHandle(message.getEntries());
connector.ack(batchId);
if (SQL_QUEUE.size() > 0) {
executeQueueSql();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
System.out.println("schema: " + schemaName + ", table: " + tableName + ", type: " + eventType);
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void saveInsertSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> columns = rowData.getAfterColumnsList();
StringBuilder sql = new StringBuilder("INSERT INTO ")
.append(entry.getHeader().getTableName()).append(" (")
.append(columns.stream().map(Column::getName).reduce((a, b) -> a + "," + b).orElse(""))
.append(") VALUES (")
.append(columns.stream().map(c -> "'" + c.getValue() + "'").reduce((a, b) -> a + "," + b).orElse(""))
.append(");");
SQL_QUEUE.add(sql.toString());
}
}
private void saveUpdateSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
List<Column> newColumns = rowData.getAfterColumnsList();
List<Column> oldColumns = rowData.getBeforeColumnsList();
StringBuilder setClause = new StringBuilder();
for (Column col : newColumns) {
setClause.append(col.getName()).append("='").append(col.getValue()).append("', ");
}
if (setClause.length() > 0) setClause.setLength(setClause.length() - 2);
String whereClause = oldColumns.stream()
.filter(Column::getIsKey)
.map(c -> c.getName() + "='" + c.getValue() + "'")
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到主键"));
SQL_QUEUE.add("UPDATE " + entry.getHeader().getTableName() + " SET " + setClause + " WHERE " + whereClause + ";");
}
}
private void saveDeleteSql(Entry entry) throws Exception {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
String whereClause = rowData.getBeforeColumnsList().stream()
.filter(Column::getIsKey)
.map(c -> c.getName() + "='" + c.getValue() + "'")
.findFirst()
.orElseThrow(() -> new RuntimeException("未找到主键"));
SQL_QUEUE.add("DELETE FROM " + entry.getHeader().getTableName() + " WHERE " + whereClause + ";");
}
}
private void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
canaldb.execute(sql);
}
}
}
*/

View File

@@ -0,0 +1,27 @@
/*
package com.tashow.cloud.canal.service;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@Service
public class Canaldb {
@Autowired
private JdbcTemplate jdbcTemplate;
@DS("slave")
public void execute(String sql) {
try {
String ds = DynamicDataSourceContextHolder.peek(); // 调试查看当前数据源
System.out.println("当前数据源:" + ds);
System.out.println("[execute]----> " + sql);
jdbcTemplate.execute(sql);
} catch (Exception e) {
e.printStackTrace();
}
}
}
*/

View File

@@ -0,0 +1,56 @@
package com.tashow.cloud.canal.service;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
@Service
public class SqlExecutorService {
private static final Logger log = LoggerFactory.getLogger(SqlExecutorService.class);
@Autowired
private JdbcTemplate jdbcTemplate;
public void executeBatch(List<SqlTask> tasks) {
if (tasks == null || tasks.isEmpty()) return;
DynamicDataSourceContextHolder.push("slave");
try {
// 提取所有 SQL 模板(假设它们都是一样的)
String sqlTemplate = tasks.get(0).getSql();
// 执行批量更新
jdbcTemplate.batchUpdate(sqlTemplate, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
SqlTask task = tasks.get(i);
Object[] args = task.getArgs();
for (int j = 0; j < args.length; j++) {
ps.setObject(j + 1, args[j]);
}
}
@Override
public int getBatchSize() {
return tasks.size();
}
});
log.info("✅ 成功执行 {} 条 SQL", tasks.size());
} catch (Exception e) {
log.error("❌ 批量执行 SQL 失败", e);
} finally {
DynamicDataSourceContextHolder.poll();
}
}
}

View File

@@ -0,0 +1,19 @@
package com.tashow.cloud.canal.service;
public class SqlTask {
private final String sql;
private final Object[] args;
public SqlTask(String sql, Object... args) {
this.sql = sql;
this.args = args;
}
public String getSql() {
return sql;
}
public Object[] getArgs() {
return args;
}
}

View File

@@ -0,0 +1,32 @@
package com.tashow.cloud.canal.service;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class SqlTaskQueue {
private static final Queue<SqlTask> queue = new ConcurrentLinkedQueue<>();
public static void add(SqlTask task) {
queue.add(task);
}
public static boolean isEmpty() {
return queue.isEmpty();
}
public static SqlTask poll() {
return queue.poll();
}
public static int size() {
return queue.size();
}
public static List<SqlTask> drainAll() {
List<SqlTask> list = new java.util.ArrayList<>();
queue.forEach(list::add);
queue.clear();
return list;
}
}

View File

@@ -0,0 +1 @@
com.tashow.cloud.canal.config.CanalAutoConfiguration

View File

@@ -37,6 +37,14 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!-- 业务组件 -->
<dependency>
<groupId>com.tashow.cloud</groupId>
@@ -69,6 +77,13 @@
<artifactId>tashow-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.tashow.cloud</groupId>
<artifactId>tashow-data-canal</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<!-- RPC 远程调用相关 -->
<dependency>
<groupId>com.tashow.cloud</groupId>

View File

@@ -0,0 +1,204 @@
package com.tashow.cloud.infra.framework;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CanalClient {
//sql队列
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
/**
* canal入库方法
*/
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("43.139.42.137",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
// connector.subscribe(".*\\..*");
connector.subscribe("tashow-platform");
connector.rollback();
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//当队列里面堆积的sql大于一定数值的时候就模拟执行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/**
* 模拟执行队列里面的sql语句
*/
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql);
}
}
/**
* 数据处理
*
* @param entrys
*/
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if(entry.getHeader().getSchemaName().equals("hc")){
return;
}
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/**
* 保存更新语句
*
* @param entry
*/
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存删除语句
*
* @param entry
*/
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存插入语句
*
* @param entry
*/
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 入库
* @param sql
*/
public void execute(String sql) {
System.out.println("sql======="+sql);
}
}

View File

@@ -7,10 +7,11 @@ spring:
username: nacos # Nacos 账号
password: nacos # Nacos 密码
discovery: # 【配置中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境
namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
metadata:
version: 1.0.0 # 服务实例的版本号,可用于灰度发布
config: # 【注册中心】配置项
namespace: liwq # 命名空间。这里使用 dev 开发环境
namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP