Skip to content

Commit

Permalink
perf: 分库分表改造-任务表以及子表加入 task_instance_id 字段 #2991
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Nov 21, 2024
1 parent 8243d21 commit 54fed24
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 398 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.tencent.bk.job.backup.archive;

import com.tencent.bk.job.backup.archive.dao.JobInstanceColdDAO;
import com.tencent.bk.job.backup.archive.model.ArchiveTaskContext;
import com.tencent.bk.job.backup.archive.model.ArchiveTaskSummary;
import com.tencent.bk.job.backup.archive.model.JobInstanceArchiveTaskInfo;
import com.tencent.bk.job.backup.archive.model.TimeAndIdBasedArchiveProcess;
Expand Down Expand Up @@ -142,6 +143,10 @@ public void registerDoneCallback(ArchiveTaskDoneCallback archiveTaskDoneCallback

private void archive() {
try {
// 设置归档任务上下文
ArchiveTaskContextHolder.set(new ArchiveTaskContext(archiveTaskInfo));

// 获取分布式锁
if (!acquireLock()) {
archiveTaskSummary.setSkip(!isAcquireLock);
return;
Expand Down Expand Up @@ -179,6 +184,7 @@ private void archive() {
if (checkStopFlag()) {
stopTask();
}
ArchiveTaskContextHolder.unset();
}
}

Expand Down Expand Up @@ -250,7 +256,7 @@ private List<T> readJobInstanceRecords(int readLimit) {
archiveTaskInfo.getToTimestamp(), fromTaskInstanceId, readLimit);
log.info("[{}] Read sorted job instance from hot db, fromJobCreateTime: {}, toJobCreatTime: {}, " +
"fromJobInstanceId: {}, resultSize: {}, cost: {} ms",
taskId, progress.getTimestamp(), archiveTaskInfo.getToTimestamp(), progress.getId(),
taskId, fromTime, archiveTaskInfo.getToTimestamp(), fromTaskInstanceId,
jobInstanceRecords.size(), System.currentTimeMillis() - readStartTime);
return jobInstanceRecords;
}
Expand Down Expand Up @@ -284,14 +290,18 @@ protected boolean isDeleteEnable() {
private boolean acquireLock() {
this.isAcquireLock = archiveTaskLock.lock(taskId);
if (!isAcquireLock) {
log.info("[{}] Acquire lock fail", taskId);
log.info("[{}] Acquire archive task lock fail", taskId);
}
return isAcquireLock;
}

private void updateArchiveProgress(ArchiveTaskStatusEnum taskStatus, TimeAndIdBasedArchiveProcess progress) {
archiveTaskInfo.setStatus(taskStatus);
archiveTaskInfo.setProcess(progress);
if (taskStatus != ArchiveTaskStatusEnum.RUNNING) {
log.info("[{}] Update archive task process, taskStatus: {}, process: {}",
taskId, taskStatus, progress);
}
archiveTaskService.updateTask(archiveTaskInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,28 @@
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.backup.archive.dao;
package com.tencent.bk.job.backup.archive;

import com.tencent.bk.job.backup.model.dto.ArchiveProgressDTO;
import com.tencent.bk.job.backup.archive.model.ArchiveTaskContext;

public interface ArchiveProgressDAO {
ArchiveProgressDTO queryArchiveProgress(String table);
public class ArchiveTaskContextHolder {

void saveArchiveProgress(ArchiveProgressDTO archiveProgress);
private static final ThreadLocal<ArchiveTaskContext> HOLDER = new ThreadLocal<>();

void saveDeleteProgress(ArchiveProgressDTO archiveProgress);
public static void set(ArchiveTaskContext archiveTaskContext) {
HOLDER.set(archiveTaskContext);
}

public static void unset() {
HOLDER.remove();
}

public static ArchiveTaskContext get() {
return HOLDER.get();
}

public static String getArchiveTaskId() {
ArchiveTaskContext context = HOLDER.get();
return context != null ? context.getArchiveTaskInfo().buildTaskUniqueId() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package com.tencent.bk.job.backup.archive;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;

/**
* 归档任务执行线程
Expand All @@ -33,18 +35,25 @@
public class ArchiveTaskWorker extends Thread {

private final JobInstanceArchiveTask archiveTask;
private final Tracer tracer;

public ArchiveTaskWorker(JobInstanceArchiveTask archiveTask) {
this.setName("ArchiveWorker-" + archiveTask.getTaskId());
public ArchiveTaskWorker(JobInstanceArchiveTask archiveTask, Tracer tracer) {
this.tracer = tracer;
this.archiveTask = archiveTask;
this.setName("ArchiveWorker-" + archiveTask.getTaskId());
}

@Override
public void run() {
try {
Span span = tracer.nextSpan().name("archive-task");

try (Tracer.SpanInScope ignored = tracer.withSpan(span.start())) {
archiveTask.execute();
} catch (Throwable e) {
span.error(e);
log.warn("Thread interrupted!");
} finally {
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.tencent.bk.job.common.util.ThreadUtils;
import com.tencent.bk.job.common.util.json.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.context.SmartLifecycle;

import java.util.List;
Expand Down Expand Up @@ -65,6 +66,8 @@ public class JobInstanceArchiveTaskScheduler implements SmartLifecycle {
private final ArchiveErrorTaskCounter archiveErrorTaskCounter;
private final ArchiveTablePropsStorage archiveTablePropsStorage;

private final Tracer tracer;

private final Object lifecycleMonitor = new Object();

/**
Expand Down Expand Up @@ -97,7 +100,8 @@ public JobInstanceArchiveTaskScheduler(ArchiveTaskService archiveTaskService,
JobInstanceColdDAO jobInstanceColdDAO,
ArchiveTaskLock archiveTaskLock,
ArchiveErrorTaskCounter archiveErrorTaskCounter,
ArchiveTablePropsStorage archiveTablePropsStorage) {
ArchiveTablePropsStorage archiveTablePropsStorage,
Tracer tracer) {
this.archiveTaskService = archiveTaskService;
this.taskInstanceRecordDAO = taskInstanceRecordDAO;
this.archiveProperties = archiveProperties;
Expand All @@ -107,6 +111,7 @@ public JobInstanceArchiveTaskScheduler(ArchiveTaskService archiveTaskService,
this.archiveTaskLock = archiveTaskLock;
this.archiveErrorTaskCounter = archiveErrorTaskCounter;
this.archiveTablePropsStorage = archiveTablePropsStorage;
this.tracer = tracer;
}

public void schedule() {
Expand Down Expand Up @@ -196,7 +201,7 @@ private void startArchiveTask(JobInstanceArchiveTaskInfo archiveTaskInfo) {
}
scheduledTasks.put(archiveTask.getTaskId(), archiveTask);
}
ArchiveTaskWorker worker = new ArchiveTaskWorker(archiveTask);
ArchiveTaskWorker worker = new ArchiveTaskWorker(archiveTask, tracer);
worker.start();
log.info("Start JobInstanceArchiveTask success, taskId: {}", archiveTaskInfo.buildTaskUniqueId());
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.tencent.bk.job.backup.archive.impl;

import com.tencent.bk.job.backup.archive.ArchiveTablePropsStorage;
import com.tencent.bk.job.backup.archive.ArchiveTaskContextHolder;
import com.tencent.bk.job.backup.archive.JobInstanceSubTableArchiver;
import com.tencent.bk.job.backup.archive.dao.JobInstanceColdDAO;
import com.tencent.bk.job.backup.archive.dao.impl.AbstractJobInstanceHotRecordDAO;
Expand Down Expand Up @@ -66,8 +67,8 @@ public void backupRecords(List<Long> jobInstanceIds) {
while (recordResultSet.next()) {
List<? extends TableRecord<?>> records = recordResultSet.getRecords();
long readEndTime = System.currentTimeMillis();
log.info("Read {}, recordSize: {}, cost: {}ms", tableName,
CollectionUtils.isEmpty(records) ? 0 : records.size(), readEndTime - startTime);
log.info("[{}] Read {}, recordSize: {}, cost: {}ms", ArchiveTaskContextHolder.getArchiveTaskId(),
tableName, CollectionUtils.isEmpty(records) ? 0 : records.size(), readEndTime - startTime);
if (CollectionUtils.isNotEmpty(records)) {
jobInstanceColdDAO.batchInsert(records,
archiveTablePropsStorage.getBatchInsertRowSize(tableName));
Expand All @@ -80,7 +81,8 @@ public void deleteRecords(List<Long> jobInstanceIds) {
long startTime = System.currentTimeMillis();
int deleteRows = jobInstanceHotRecordDAO.deleteRecords(jobInstanceIds,
archiveTablePropsStorage.getDeleteLimitRowCount(tableName));
log.info("Delete {}, taskInstanceIdSize: {}, deletedRows: {}, cost: {}ms", tableName,
log.info("[{}] Delete {}, taskInstanceIdSize: {}, deletedRows: {}, cost: {}ms",
ArchiveTaskContextHolder.getArchiveTaskId(), tableName,
jobInstanceIds.size(), deleteRows, System.currentTimeMillis() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,14 @@
package com.tencent.bk.job.backup.archive.model;

import lombok.Data;
import lombok.ToString;

@Data
@ToString
public class ArchiveProgressDTO {
private String tableName;
/**
* 最后备份ID
*/
private Long lastBackupId;
public class ArchiveTaskContext {

/**
* 最后删除ID
*/
private Long lastDeletedId;
private Long lastBackupTime;
private Long lastDeleteTime;
private JobInstanceArchiveTaskInfo archiveTaskInfo;


public ArchiveTaskContext(JobInstanceArchiveTaskInfo archiveTaskInfo) {
this.archiveTaskInfo = archiveTaskInfo;
}
}
Loading

0 comments on commit 54fed24

Please sign in to comment.