Skip to content

Commit

Permalink
Merge pull request #3360 from wangyu096/issue_3359
Browse files Browse the repository at this point in the history
fix: 重启 job-backup 导致正在执行的归档任务未无损终止 #3359
  • Loading branch information
wangyu096 authored Dec 26, 2024
2 parents 987dcca + 451ab24 commit 49729bb
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public void schedule() {
continue;
}


// 获取待调度的任务信息(按照 DB 节点计数)
watch.start("countScheduleTasks");
Map<String, Integer> scheduleTasksGroupByDb =
Expand All @@ -162,12 +161,12 @@ public void schedule() {
watch.stop();
int taskConcurrent = archiveProperties.getTasks().getJobInstance().getConcurrent();
if (highestPriorityDbNodeTasksInfo.getRunningTaskCount() >= taskConcurrent) {
// 休眠5分钟,等待并行任务减少
log.info("Running archive task count exceed concurrent limit : {}, wait 300s", taskConcurrent);
// 休眠1分钟,等待并行任务减少
log.info("Running archive task count exceed concurrent limit : {}, wait 60s", taskConcurrent);
// 释放锁
jobInstanceArchiveTaskScheduleLock.unlock();
locked = false;
ThreadUtils.sleep(1000 * 300L);
ThreadUtils.sleep(1000 * 60L);
continue;
}

Expand Down Expand Up @@ -280,8 +279,8 @@ private void stopTasksGraceful() {
}
try {
if (taskCountDownLatch != null) {
// 等待任务结束,最多等待 2min
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(120);
// 等待任务结束,最多等待 10s(等待时间太长进程会被k8s kill掉)
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(10);
if (!isAllTaskStopped) {
for (JobInstanceArchiveTask task : scheduledTasks.values()) {
task.forceStopAtOnce();
Expand All @@ -307,10 +306,13 @@ private static final class StopTask implements Runnable {
@Override
public void run() {
try {
log.info("[{}] Run stop task begin", task.getTaskId());
task.stop(() -> taskCountDownLatch.decrement(task.getTaskId()));
} catch (Throwable e) {
String errorMsg = "Stop archive task caught exception, task: " + task;
String errorMsg = "Stop archive task caught exception, task: " + task.getTaskId();
log.warn(errorMsg, e);
} finally {
log.info("[{}] Run stop task end", task.getTaskId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,39 +32,7 @@
import com.tencent.bk.job.backup.archive.JobInstanceArchiveTaskScheduler;
import com.tencent.bk.job.backup.archive.JobInstanceSubTableArchivers;
import com.tencent.bk.job.backup.archive.dao.JobInstanceColdDAO;
import com.tencent.bk.job.backup.archive.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseFileAgentTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseFileExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseScriptAgentTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseScriptExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.JobInstanceHotRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.OperationLogRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.RollingConfigRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceConfirmRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceFileRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceRollingTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceScriptRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceVariableRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.TaskInstanceHostRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.TaskInstanceVariableRecordDAO;
import com.tencent.bk.job.backup.archive.impl.FileSourceTaskLogArchiver;
import com.tencent.bk.job.backup.archive.impl.GseFileAgentTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseFileExecuteObjTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseScriptAgentTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseScriptExecuteObjTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.OperationLogArchiver;
import com.tencent.bk.job.backup.archive.impl.RollingConfigArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceConfirmArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceFileArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceRollingTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceScriptArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceVariableArchiver;
import com.tencent.bk.job.backup.archive.impl.TaskInstanceHostArchiver;
import com.tencent.bk.job.backup.archive.impl.TaskInstanceVariableArchiver;
import com.tencent.bk.job.backup.archive.metrics.ArchiveTasksGauge;
import com.tencent.bk.job.backup.archive.service.ArchiveTaskService;
import com.tencent.bk.job.backup.archive.util.lock.ArchiveTaskExecuteLock;
Expand All @@ -73,7 +41,6 @@
import com.tencent.bk.job.backup.archive.util.lock.JobInstanceArchiveTaskScheduleLock;
import com.tencent.bk.job.backup.metrics.ArchiveErrorTaskCounter;
import com.tencent.bk.job.common.WatchableThreadPoolExecutor;
import com.tencent.bk.job.common.mysql.dynamic.ds.DSLContextProvider;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
Expand Down Expand Up @@ -144,7 +111,7 @@ public ThreadPoolExecutor archiveTaskStopExecutor(MeterRegistry meterRegistry) {
5,
20,
120L,
TimeUnit.MILLISECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("archive-task-stop-thread-pool-%d").build()
);
Expand Down
6 changes: 6 additions & 0 deletions support-files/kubernetes/charts/bk-job/VALUES_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ analysisConfig:
model: hunyuan
```
2. 调整pod删除时等待优雅关闭的最大时间,从 40s -> 60s
```yaml
# pod删除时等待优雅关闭的最大时间,单位为秒(超出后强制删除)
podTerminationGracePeriodSeconds: 60
```
## 0.8.0
1. 增加按主机拓扑路径鉴权相关配置
```yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ spec:
annotations:
{{ include "annotations.sha256sum.configmap" ( dict "service" "job-execute" "context" . ) | nindent 8 }}
spec:
{{- include "job.podTerminationGracePeriodSeconds" . | nindent 6 }}
{{- include "job.imagePullSecrets" . | nindent 6 }}
hostAliases: {{- include "common.tplvalues.render" (dict "value" .Values.hostAliases "context" $) | nindent 8 }}
{{- if .Values.executeConfig.affinity }}
Expand Down
2 changes: 1 addition & 1 deletion support-files/kubernetes/charts/bk-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1807,4 +1807,4 @@ assembleConfig:
readTimeout: 300000

# pod删除时等待优雅关闭的最大时间,单位为秒(超出后强制删除)
podTerminationGracePeriodSeconds: 40
podTerminationGracePeriodSeconds: 60

0 comments on commit 49729bb

Please sign in to comment.