diff --git a/logs/infra-server.log.2025-05-22.0.gz b/logs/infra-server.log.2025-05-22.0.gz deleted file mode 100644 index 67228b4..0000000 Binary files a/logs/infra-server.log.2025-05-22.0.gz and /dev/null differ diff --git a/logs/infra-server.log.2025-05-23.0.gz b/logs/infra-server.log.2025-05-23.0.gz deleted file mode 100644 index 0064c6c..0000000 Binary files a/logs/infra-server.log.2025-05-23.0.gz and /dev/null differ diff --git a/tashow-framework/pom.xml b/tashow-framework/pom.xml index 0e47e10..06d6854 100644 --- a/tashow-framework/pom.xml +++ b/tashow-framework/pom.xml @@ -27,6 +27,7 @@ tashow-data-redis tashow-data-excel tashow-data-es + tashow-data-canal diff --git a/tashow-framework/tashow-data-canal/pom.xml b/tashow-framework/tashow-data-canal/pom.xml new file mode 100644 index 0000000..f3daf9d --- /dev/null +++ b/tashow-framework/tashow-data-canal/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + com.tashow.cloud + tashow-framework + ${revision} + + tashow-data-canal + jar + + ${project.artifactId} + canal 封装拓展 + + + + com.alibaba.otter + canal.client + 1.1.0 + + + + + com.baomidou + dynamic-datasource-spring-boot-starter + 4.2.0 + + + + javax.annotation + javax.annotation-api + 1.3.2 + + + + com.tashow.cloud + tashow-common + + + + + org.projectlombok + lombok + true + + + diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/config/CanalAutoConfiguration.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/config/CanalAutoConfiguration.java new file mode 100644 index 0000000..038eedd --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/config/CanalAutoConfiguration.java @@ -0,0 +1,22 @@ +package com.tashow.cloud.canal.config; + +import com.tashow.cloud.canal.service.CanalSyncService; +import com.tashow.cloud.canal.service.SqlExecutorService; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@AutoConfiguration +public class CanalAutoConfiguration { + + @Bean + public CanalSyncService canalSyncService() { + return new CanalSyncService(); + } + + @Bean + public SqlExecutorService getdb() { + return new SqlExecutorService(); + } +} diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncService.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncService.java new file mode 100644 index 0000000..64c6a87 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncService.java @@ -0,0 +1,188 @@ +package com.tashow.cloud.canal.service; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry.*; +import com.alibaba.otter.canal.protocol.Message; +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Service +public class CanalSyncService { + + private static final Logger log = LoggerFactory.getLogger(CanalSyncService.class); + + private static final Queue SQL_QUEUE = new ConcurrentLinkedQueue<>(); + + @Autowired + private JdbcTemplate jdbcTemplate; + @Autowired + private SqlExecutorService sqlExecutorService; + + @PostConstruct + public void start() { + new Thread(this::runCanalClient).start(); + } + + private void runCanalClient() { + CanalConnector connector = CanalConnectors.newSingleConnector( + new InetSocketAddress("43.139.42.137", 11111), + "example", + "", + "" + ); + int batchSize = 1000; + + try { + connector.connect(); + connector.subscribe("tashow-platform\\..*"); + connector.rollback(); + + while (true) { + Message message = connector.getWithoutAck(batchSize); + log.info("Received message id: {}, entries size: {}", message.getId(), message.getEntries().size()); + long batchId = message.getId(); + int size = message.getEntries().size(); + + if (batchId == -1 || size == 0) { + Thread.sleep(1000); + continue; + } + + dataHandle(message.getEntries()); + connector.ack(batchId); + + if (!SQL_QUEUE.isEmpty()) { + executeQueueSql(); + } + } + } catch (Exception e) { + log.error("Canal client error occurred.", e); + } finally { + connector.disconnect(); + } + } + + private void dataHandle(List entries) { + for (Entry entry : entries) { + if (entry.getEntryType() != EntryType.ROWDATA) continue; + + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + EventType eventType = rowChange.getEventType(); + String schemaName = entry.getHeader().getSchemaName(); + String tableName = entry.getHeader().getTableName(); + + log.info("schema: {}, table: {}, type: {}", schemaName, tableName, eventType); + + if (eventType == EventType.DELETE) { + saveDeleteSql(entry); + } else if (eventType == EventType.UPDATE) { + saveUpdateSql(entry); + } else if (eventType == EventType.INSERT) { + saveInsertSql(entry); + } + + } catch (Exception e) { + log.error("Error handling entry: {}", entry.toString(), e); + } + } + } + + private void saveInsertSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + String tableName = entry.getHeader().getTableName(); + + for (RowData rowData : rowChange.getRowDatasList()) { + List columns = rowData.getAfterColumnsList(); + + List columnNames = new ArrayList<>(); + List values = new ArrayList<>(); + + for (Column col : columns) { + columnNames.add(col.getName()); + values.add(col.getValue()); + } + + String sql = "INSERT INTO " + tableName + " (" + + String.join(",", columnNames) + ") VALUES ("; + + StringBuilder placeholders = new StringBuilder(); + for (int i = 0; i < values.size(); i++) { + placeholders.append("?,"); + } + if (placeholders.length() > 0) placeholders.deleteCharAt(placeholders.length() - 1); + sql += placeholders + ")"; + + SQL_QUEUE.add(new SqlTask(sql, values.toArray())); + } + } + + private void saveUpdateSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + String tableName = entry.getHeader().getTableName(); + + for (RowData rowData : rowChange.getRowDatasList()) { + List newColumns = rowData.getAfterColumnsList(); + List oldColumns = rowData.getBeforeColumnsList(); + + List updateColumns = new ArrayList<>(); + List params = new ArrayList<>(); + + for (Column col : newColumns) { + updateColumns.add(col.getName() + "=?"); + params.add(col.getValue()); + } + + Optional primaryKeyOpt = oldColumns.stream().filter(Column::getIsKey).findFirst(); + Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键")); + + params.add(primaryKey.getValue()); + + String sql = "UPDATE " + tableName + " SET " + + String.join(",", updateColumns) + + " WHERE " + primaryKey.getName() + "=?"; + + SQL_QUEUE.add(new SqlTask(sql, params.toArray())); + } + } + + private void saveDeleteSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + String tableName = entry.getHeader().getTableName(); + + for (RowData rowData : rowChange.getRowDatasList()) { + List beforeColumns = rowData.getBeforeColumnsList(); + + Optional primaryKeyOpt = beforeColumns.stream().filter(Column::getIsKey).findFirst(); + Column primaryKey = primaryKeyOpt.orElseThrow(() -> new RuntimeException("未找到主键")); + + String sql = "DELETE FROM " + tableName + " WHERE " + primaryKey.getName() + "=?"; + SQL_QUEUE.add(new SqlTask(sql, primaryKey.getValue())); + } + } + + private void executeQueueSql() { + List tasks = new ArrayList<>(); + SqlTask task; + while ((task = SQL_QUEUE.poll()) != null) { + tasks.add(task); + } + if (!tasks.isEmpty()) { + sqlExecutorService.executeBatch(tasks); + } + } +} \ No newline at end of file diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncServiceTest.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncServiceTest.java new file mode 100644 index 0000000..9a526da --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/CanalSyncServiceTest.java @@ -0,0 +1,156 @@ +/* +package com.tashow.cloud.canal.service; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry.*; +import com.alibaba.otter.canal.protocol.Message; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Service +public class CanalSyncServiceTest { + + private static final Queue SQL_QUEUE = new ConcurrentLinkedQueue<>(); + @Autowired + private JdbcTemplate jdbcTemplate; + @Autowired + private Canaldb canaldb; + + @PostConstruct + public void start() { + new Thread(this::runCanalClient).start(); + } + + private void runCanalClient() { + CanalConnector connector = CanalConnectors.newSingleConnector( + new InetSocketAddress("43.139.42.137", 11111), + "example", + "", + "" + ); + int batchSize = 1000; + + try { + connector.connect(); + connector.subscribe("tashow-platform\\..*"); + connector.rollback(); + + while (true) { + Message message = connector.getWithoutAck(batchSize); + System.out.println("Received message id: " + message.getId() + ", entries size: " + message.getEntries().size()); + long batchId = message.getId(); + int size = message.getEntries().size(); + + if (batchId == -1 || size == 0) { + Thread.sleep(1000); + continue; + } + + dataHandle(message.getEntries()); + connector.ack(batchId); + + if (SQL_QUEUE.size() > 0) { + executeQueueSql(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + connector.disconnect(); + } + } + + private void dataHandle(List entries) { + for (Entry entry : entries) { + if (entry.getEntryType() != EntryType.ROWDATA) continue; + + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + EventType eventType = rowChange.getEventType(); + String schemaName = entry.getHeader().getSchemaName(); + String tableName = entry.getHeader().getTableName(); + + System.out.println("schema: " + schemaName + ", table: " + tableName + ", type: " + eventType); + + if (eventType == EventType.DELETE) { + saveDeleteSql(entry); + } else if (eventType == EventType.UPDATE) { + saveUpdateSql(entry); + } else if (eventType == EventType.INSERT) { + saveInsertSql(entry); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private void saveInsertSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + for (RowData rowData : rowChange.getRowDatasList()) { + List columns = rowData.getAfterColumnsList(); + StringBuilder sql = new StringBuilder("INSERT INTO ") + .append(entry.getHeader().getTableName()).append(" (") + .append(columns.stream().map(Column::getName).reduce((a, b) -> a + "," + b).orElse("")) + .append(") VALUES (") + .append(columns.stream().map(c -> "'" + c.getValue() + "'").reduce((a, b) -> a + "," + b).orElse("")) + .append(");"); + + SQL_QUEUE.add(sql.toString()); + } + } + + private void saveUpdateSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + for (RowData rowData : rowChange.getRowDatasList()) { + List newColumns = rowData.getAfterColumnsList(); + List oldColumns = rowData.getBeforeColumnsList(); + + StringBuilder setClause = new StringBuilder(); + for (Column col : newColumns) { + setClause.append(col.getName()).append("='").append(col.getValue()).append("', "); + } + if (setClause.length() > 0) setClause.setLength(setClause.length() - 2); + + String whereClause = oldColumns.stream() + .filter(Column::getIsKey) + .map(c -> c.getName() + "='" + c.getValue() + "'") + .findFirst() + .orElseThrow(() -> new RuntimeException("未找到主键")); + + SQL_QUEUE.add("UPDATE " + entry.getHeader().getTableName() + " SET " + setClause + " WHERE " + whereClause + ";"); + } + } + + private void saveDeleteSql(Entry entry) throws Exception { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + for (RowData rowData : rowChange.getRowDatasList()) { + String whereClause = rowData.getBeforeColumnsList().stream() + .filter(Column::getIsKey) + .map(c -> c.getName() + "='" + c.getValue() + "'") + .findFirst() + .orElseThrow(() -> new RuntimeException("未找到主键")); + + SQL_QUEUE.add("DELETE FROM " + entry.getHeader().getTableName() + " WHERE " + whereClause + ";"); + } + } + + + private void executeQueueSql() { + int size = SQL_QUEUE.size(); + for (int i = 0; i < size; i++) { + String sql = SQL_QUEUE.poll(); + canaldb.execute(sql); + } + } +} +*/ diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/Canaldb.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/Canaldb.java new file mode 100644 index 0000000..7b51600 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/Canaldb.java @@ -0,0 +1,27 @@ +/* +package com.tashow.cloud.canal.service; + +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +@Service +public class Canaldb { + + @Autowired + private JdbcTemplate jdbcTemplate; + @DS("slave") + public void execute(String sql) { + try { + String ds = DynamicDataSourceContextHolder.peek(); // 调试查看当前数据源 + System.out.println("当前数据源:" + ds); + System.out.println("[execute]----> " + sql); + jdbcTemplate.execute(sql); + } catch (Exception e) { + e.printStackTrace(); + } + } +} +*/ diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlExecutorService.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlExecutorService.java new file mode 100644 index 0000000..3ba0286 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlExecutorService.java @@ -0,0 +1,56 @@ +package com.tashow.cloud.canal.service; + + +import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +@Service +public class SqlExecutorService { + + private static final Logger log = LoggerFactory.getLogger(SqlExecutorService.class); + + @Autowired + private JdbcTemplate jdbcTemplate; + + public void executeBatch(List tasks) { + if (tasks == null || tasks.isEmpty()) return; + + DynamicDataSourceContextHolder.push("slave"); + try { + // 提取所有 SQL 模板(假设它们都是一样的) + String sqlTemplate = tasks.get(0).getSql(); + + // 执行批量更新 + jdbcTemplate.batchUpdate(sqlTemplate, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + SqlTask task = tasks.get(i); + Object[] args = task.getArgs(); + for (int j = 0; j < args.length; j++) { + ps.setObject(j + 1, args[j]); + } + } + + @Override + public int getBatchSize() { + return tasks.size(); + } + }); + + log.info("✅ 成功执行 {} 条 SQL", tasks.size()); + } catch (Exception e) { + log.error("❌ 批量执行 SQL 失败", e); + } finally { + DynamicDataSourceContextHolder.poll(); + } + } +} \ No newline at end of file diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTask.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTask.java new file mode 100644 index 0000000..c6f62c2 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTask.java @@ -0,0 +1,19 @@ +package com.tashow.cloud.canal.service; + +public class SqlTask { + private final String sql; + private final Object[] args; + + public SqlTask(String sql, Object... args) { + this.sql = sql; + this.args = args; + } + + public String getSql() { + return sql; + } + + public Object[] getArgs() { + return args; + } +} \ No newline at end of file diff --git a/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTaskQueue.java b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTaskQueue.java new file mode 100644 index 0000000..edf2489 --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/java/com/tashow/cloud/canal/service/SqlTaskQueue.java @@ -0,0 +1,32 @@ +package com.tashow.cloud.canal.service; + +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class SqlTaskQueue { + private static final Queue queue = new ConcurrentLinkedQueue<>(); + + public static void add(SqlTask task) { + queue.add(task); + } + + public static boolean isEmpty() { + return queue.isEmpty(); + } + + public static SqlTask poll() { + return queue.poll(); + } + + public static int size() { + return queue.size(); + } + + public static List drainAll() { + List list = new java.util.ArrayList<>(); + queue.forEach(list::add); + queue.clear(); + return list; + } +} diff --git a/tashow-framework/tashow-data-canal/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/tashow-framework/tashow-data-canal/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..361a8ce --- /dev/null +++ b/tashow-framework/tashow-data-canal/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.tashow.cloud.canal.config.CanalAutoConfiguration \ No newline at end of file diff --git a/tashow-module/tashow-module-infra/pom.xml b/tashow-module/tashow-module-infra/pom.xml index 9d6c752..b6b9120 100644 --- a/tashow-module/tashow-module-infra/pom.xml +++ b/tashow-module/tashow-module-infra/pom.xml @@ -37,6 +37,14 @@ ${revision} + + + com.alibaba.otter + canal.client + 1.1.0 + + + com.tashow.cloud @@ -69,6 +77,13 @@ tashow-data-redis + + com.tashow.cloud diff --git a/tashow-module/tashow-module-infra/src/main/java/com/tashow/cloud/infra/framework/CanalClient.java b/tashow-module/tashow-module-infra/src/main/java/com/tashow/cloud/infra/framework/CanalClient.java new file mode 100644 index 0000000..95a6876 --- /dev/null +++ b/tashow-module/tashow-module-infra/src/main/java/com/tashow/cloud/infra/framework/CanalClient.java @@ -0,0 +1,204 @@ +package com.tashow.cloud.infra.framework; + + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry.*; +import com.alibaba.otter.canal.protocol.Message; +import com.google.protobuf.InvalidProtocolBufferException; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Component +public class CanalClient { + + //sql队列 + private Queue SQL_QUEUE = new ConcurrentLinkedQueue<>(); + + + /** + * canal入库方法 + */ + public void run() { + CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("43.139.42.137", + 11111), "example", "", ""); + int batchSize = 1000; + try { + connector.connect(); + // connector.subscribe(".*\\..*"); + connector.subscribe("tashow-platform"); + + connector.rollback(); + try { + while (true) { + //尝试从master那边拉去数据batchSize条记录,有多少取多少 + Message message = connector.getWithoutAck(batchSize); + long batchId = message.getId(); + int size = message.getEntries().size(); + if (batchId == -1 || size == 0) { + Thread.sleep(1000); + } else { + dataHandle(message.getEntries()); + } + connector.ack(batchId); + + //当队列里面堆积的sql大于一定数值的时候就模拟执行 + if (SQL_QUEUE.size() >= 1) { + executeQueueSql(); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } finally { + connector.disconnect(); + } + } + + /** + * 模拟执行队列里面的sql语句 + */ + public void executeQueueSql() { + int size = SQL_QUEUE.size(); + for (int i = 0; i < size; i++) { + String sql = SQL_QUEUE.poll(); + System.out.println("[sql]----> " + sql); + + this.execute(sql); + } + } + + /** + * 数据处理 + * + * @param entrys + */ + private void dataHandle(List entrys) throws InvalidProtocolBufferException { + for (Entry entry : entrys) { + if(entry.getHeader().getSchemaName().equals("hc")){ + return; + } + if (EntryType.ROWDATA == entry.getEntryType()) { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + EventType eventType = rowChange.getEventType(); + if (eventType == EventType.DELETE) { + saveDeleteSql(entry); + } else if (eventType == EventType.UPDATE) { + saveUpdateSql(entry); + } else if (eventType == EventType.INSERT) { + saveInsertSql(entry); + } + } + } + } + + /** + * 保存更新语句 + * + * @param entry + */ + private void saveUpdateSql(Entry entry) { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + List rowDatasList = rowChange.getRowDatasList(); + for (RowData rowData : rowDatasList) { + List newColumnList = rowData.getAfterColumnsList(); + StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); + for (int i = 0; i < newColumnList.size(); i++) { + sql.append(" " + newColumnList.get(i).getName() + + " = '" + newColumnList.get(i).getValue() + "'"); + if (i != newColumnList.size() - 1) { + sql.append(","); + } + } + sql.append(" where "); + List oldColumnList = rowData.getBeforeColumnsList(); + for (Column column : oldColumnList) { + if (column.getIsKey()) { + //暂时只支持单一主键 + sql.append(column.getName() + "=" + column.getValue()); + break; + } + } + SQL_QUEUE.add(sql.toString()); + } + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + + /** + * 保存删除语句 + * + * @param entry + */ + private void saveDeleteSql(Entry entry) { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + List rowDatasList = rowChange.getRowDatasList(); + for (RowData rowData : rowDatasList) { + List columnList = rowData.getBeforeColumnsList(); + StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); + for (Column column : columnList) { + if (column.getIsKey()) { + //暂时只支持单一主键 + sql.append(column.getName() + "=" + column.getValue()); + break; + } + } + SQL_QUEUE.add(sql.toString()); + } + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + + /** + * 保存插入语句 + * + * @param entry + */ + private void saveInsertSql(Entry entry) { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + List rowDatasList = rowChange.getRowDatasList(); + for (RowData rowData : rowDatasList) { + List columnList = rowData.getAfterColumnsList(); + StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); + for (int i = 0; i < columnList.size(); i++) { + sql.append(columnList.get(i).getName()); + if (i != columnList.size() - 1) { + sql.append(","); + } + } + sql.append(") VALUES ("); + for (int i = 0; i < columnList.size(); i++) { + sql.append("'" + columnList.get(i).getValue() + "'"); + if (i != columnList.size() - 1) { + sql.append(","); + } + } + sql.append(")"); + SQL_QUEUE.add(sql.toString()); + } + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + } + + /** + * 入库 + * @param sql + */ + public void execute(String sql) { + System.out.println("sql======="+sql); + } +} diff --git a/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml b/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml index a26f4bb..15b2156 100644 --- a/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml +++ b/tashow-module/tashow-module-infra/src/main/resources/application-local.yaml @@ -7,10 +7,11 @@ spring: username: nacos # Nacos 账号 password: nacos # Nacos 密码 discovery: # 【配置中心】配置项 - namespace: liwq # 命名空间。这里使用 dev 开发环境 + namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境 group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP metadata: version: 1.0.0 # 服务实例的版本号,可用于灰度发布 config: # 【注册中心】配置项 - namespace: liwq # 命名空间。这里使用 dev 开发环境 + namespace: 76667956-2ac2-4e05-906b-4bca4ebcc5f0 # 命名空间。这里使用 dev 开发环境 group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP +