Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: 分库分表改造前置准备 #2991 #2994

Merged
merged 65 commits into from
Nov 25, 2024

Conversation

wangyu096
Copy link
Collaborator

@wangyu096 wangyu096 commented May 24, 2024

背景

在 Job 支持分库分表之前,需要做一些准备工作,比如对所有表添加分片键并更新数据等。

分库分表整体设计

  1. 作业实例数据基于 task_instance_id 分片,所以表需要补充 task_instance_id 字段
  2. 由于“执行历史”功能需要根据业务 ID+ 时间范围+其他检索条件查询,根据 task_instance_id 分片的数据无法在该场景下会导致全分片查询。所以,冗余一份作业实例基础数据,按照业务 ID 分片。
  3. 使用分布式 ID 组件 LEAF,替换原 MySQL 的自增长主键
  4. 改造执行任务数据归档服务,数据归档清理不再依赖 task_instance_id自增长逻辑;支持归档任务并发控制,避免归档清理数据操作导致 db 主从延迟

详细设计说明

作业相关的表加入 task_instance_id 作为分片键

常规的 DAO 、Service 层方法添加 task_instance_id 参数,可不用细看。重点需要 Review的代码如下:

  1. TaskInstanceIdDynamicCondition: 动态构造 DAO task_instance_id 查询条件
  2. 0026_job_execute_20241115-1000_V3.11.2_mysql.sql :表结构和数据变更

使用分布式主键框架美团 leaf 替换 MySQL 自增长主键

重点 review 代码:

  1. IdGen 、SegmentIdGen、AutoIncrementIdGen、PropBasedDynamicIdGen:Id 生成组件定义
  2. PropBasedDynamicIdGen:基于属性动态控制的ID生成器。可以在运行时根据动态属性变更 ID 生成组件的类型,用于多个微服务实例同时从自增长主键到分布式主键的切换,避免切换过程中数据冲突。
  3. AbstractPropBasedDynamicComponent:重构动态属性广播、监听、处理方案,抽取公共组件

执行历史归档改造

  • 改造说明
  1. 以(archive_task_type:db:table:day:hour)拆分基础归档任务,用于单 db/分库分表 db 的任务并发执行
  2. 归档逻辑不再依赖与 task_instance_id 的自增长,而是使用 job_create_time+task_instance_id 排序后的结果作为遍历执行历史记录表的依据
  3. 归档性能优化:支持归档任务并发控制、支持归档任务按 db 维度均衡调度
  • 重点 review 代码
  1. JobInstanceArchiveTaskGenerator: 归档任务生成
  2. JobInstanceArchiveTaskScheduler:归档任务调度
  3. ArchiveDbNodePriorityEvaluator:归档任务调度,DB 节点优先级计算,保证归档任务均衡分布在每个 db 节点上
  4. JobInstanceSubTableArchiver 以及相关子类: 作业实例子表的归档实现
  5. ArchiveTaskService: 归档任务服务类,管理归档任务

wangyu096 added 20 commits May 20, 2024 20:34
@wangyu096 wangyu096 requested a review from jsonwan May 24, 2024 07:33
*/
SHARDING(1);

private final int value;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个枚举类的value并不会作为单独的字段写入DB,没有太大的实际意义,建议去掉value直接使用name(),拼接到DataNodeId中语义更清晰,避免排查问题时做一次value->含义的思维转换。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会存储到 archive_task.data_node 字段,该字段上面会创建索引。基于性能考虑,用这种简短的表示比较好一些。
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

*/
private Integer day;
/**
* 归档任务所在小时。 1-24
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此处描述与代码实现不一致,建议修改为:
归档数据所在小时,0-23

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

private DbDataNode dbDataNode;

/**
* 归档任务所在天.比如 20240806
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议修改为:
归档数据所在天,比如 20240806

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*/
private Integer hour;
/**
* 归档任务时间范围-from timestamp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

描述的对象是待归档数据而非归档任务本身,建议将“任务”修改为“数据”

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*/
private Long fromTimestamp;
/**
* 归档任务时间范围-to timestamp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

描述的对象是待归档数据而非归档任务本身,建议将“任务”修改为“数据”

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

/**
* 从热 db 读取作业实例熟悉,按照时间+ID 的顺序排序
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

错别字:熟悉->数据

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

boolean backupEnabled = isBackupEnable();
boolean deleteEnabled = isDeleteEnable();

int readLimit = 1000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

读取批量大小在这里写死的,是否应该使用ArchiveProperties.readRowLimit及相关子表配置项?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已优化

List<Long> jobInstanceIds =
jobInstanceRecords.stream().map(this::extractJobInstanceId).collect(Collectors.toList());
// 备份主表数据
jobInstanceColdDAO.batchInsert(jobInstanceRecords, 1000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

读取批量大小在这里写死的,是否应该使用ArchiveProperties.readRowLimit及相关子表配置项?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已优化

import java.util.Map;

@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "job.backup.archive.execute")
public class ArchiveDBProperties {
public class ArchiveProperties {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

该类中的readIdStepSize配置项已不再被使用,是否可以删除?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已删除

/**
* 组件是否正在运行(用于 Spring Lifecycle isRunning 判断)
*/
private volatile boolean running = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

active与running变量的初始值、赋值位置完全一致,建议合并为同一个。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

public void waitingForAllTasksDone() {
try {
log.info("Waiting for all tasks done! total: {}", latch.getCount());
this.latch.await();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议指定超时时间避免异常情况下长时间阻塞在此。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已优化

if (highestPriorityDbNodeTasksInfo.getRunningTaskCount() >= taskConcurrent) {
// 休眠5分钟,等待并行任务减少
log.info("Running archive task count exceed concurrent limit : {}, wait 300s", taskConcurrent);
ThreadUtils.sleep(1000 * 60L);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

实际休眠时间与注释不符。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}
}
} finally {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议捕获并处理异常。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

} else {
log.info("No new archive tasks are generated");
}
} finally {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议捕获并处理异常

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

public static DbDataNode standaloneDbDatNode() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是少了个a,DatNode->DataNode

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return new DbDataNode(DbDataNodeTypeEnum.STANDALONE, STANDALONE_DS_NAME, null, null);
}

public static DbDataNode shardingDbDatNode(String dataSource, Integer dbIndex, Integer tableIndex) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是少了个a,DatNode->DataNode

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@wangyu096 wangyu096 merged commit 1debd11 into TencentBlueKing:master Nov 25, 2024
3 of 4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants