Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 重启 job-backup 导致正在执行的归档任务未无损终止 #3359 #3360

Merged
merged 5 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public void schedule() {
continue;
}


// 获取待调度的任务信息(按照 DB 节点计数)
watch.start("countScheduleTasks");
Map<String, Integer> scheduleTasksGroupByDb =
Expand All @@ -162,12 +161,12 @@ public void schedule() {
watch.stop();
int taskConcurrent = archiveProperties.getTasks().getJobInstance().getConcurrent();
if (highestPriorityDbNodeTasksInfo.getRunningTaskCount() >= taskConcurrent) {
// 休眠5分钟,等待并行任务减少
log.info("Running archive task count exceed concurrent limit : {}, wait 300s", taskConcurrent);
// 休眠1分钟,等待并行任务减少
log.info("Running archive task count exceed concurrent limit : {}, wait 60s", taskConcurrent);
// 释放锁
jobInstanceArchiveTaskScheduleLock.unlock();
locked = false;
ThreadUtils.sleep(1000 * 300L);
ThreadUtils.sleep(1000 * 60L);
continue;
}

Expand Down Expand Up @@ -280,8 +279,8 @@ private void stopTasksGraceful() {
}
try {
if (taskCountDownLatch != null) {
// 等待任务结束,最多等待 2min
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(120);
// 等待任务结束,最多等待 10s(等待时间太长进程会被k8s kill掉)
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(10);
if (!isAllTaskStopped) {
for (JobInstanceArchiveTask task : scheduledTasks.values()) {
task.forceStopAtOnce();
Expand All @@ -307,10 +306,13 @@ private static final class StopTask implements Runnable {
@Override
public void run() {
try {
log.info("[{}] Run stop task begin", task.getTaskId());
task.stop(() -> taskCountDownLatch.decrement(task.getTaskId()));
} catch (Throwable e) {
String errorMsg = "Stop archive task caught exception, task: " + task;
String errorMsg = "Stop archive task caught exception, task: " + task.getTaskId();
log.warn(errorMsg, e);
} finally {
log.info("[{}] Run stop task end", task.getTaskId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,39 +32,7 @@
import com.tencent.bk.job.backup.archive.JobInstanceArchiveTaskScheduler;
import com.tencent.bk.job.backup.archive.JobInstanceSubTableArchivers;
import com.tencent.bk.job.backup.archive.dao.JobInstanceColdDAO;
import com.tencent.bk.job.backup.archive.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseFileAgentTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseFileExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseScriptAgentTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseScriptExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.JobInstanceHotRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.OperationLogRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.RollingConfigRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceConfirmRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceFileRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceRollingTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceScriptRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceVariableRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.TaskInstanceHostRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.TaskInstanceVariableRecordDAO;
import com.tencent.bk.job.backup.archive.impl.FileSourceTaskLogArchiver;
import com.tencent.bk.job.backup.archive.impl.GseFileAgentTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseFileExecuteObjTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseScriptAgentTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseScriptExecuteObjTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.OperationLogArchiver;
import com.tencent.bk.job.backup.archive.impl.RollingConfigArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceConfirmArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceFileArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceRollingTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceScriptArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceVariableArchiver;
import com.tencent.bk.job.backup.archive.impl.TaskInstanceHostArchiver;
import com.tencent.bk.job.backup.archive.impl.TaskInstanceVariableArchiver;
import com.tencent.bk.job.backup.archive.metrics.ArchiveTasksGauge;
import com.tencent.bk.job.backup.archive.service.ArchiveTaskService;
import com.tencent.bk.job.backup.archive.util.lock.ArchiveTaskExecuteLock;
Expand All @@ -73,7 +41,6 @@
import com.tencent.bk.job.backup.archive.util.lock.JobInstanceArchiveTaskScheduleLock;
import com.tencent.bk.job.backup.metrics.ArchiveErrorTaskCounter;
import com.tencent.bk.job.common.WatchableThreadPoolExecutor;
import com.tencent.bk.job.common.mysql.dynamic.ds.DSLContextProvider;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
Expand Down Expand Up @@ -144,7 +111,7 @@ public ThreadPoolExecutor archiveTaskStopExecutor(MeterRegistry meterRegistry) {
5,
20,
120L,
TimeUnit.MILLISECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("archive-task-stop-thread-pool-%d").build()
);
Expand Down
6 changes: 6 additions & 0 deletions support-files/kubernetes/charts/bk-job/VALUES_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ analysisConfig:
model: hunyuan
```

2. 调整pod删除时等待优雅关闭的最大时间,从 40s -> 60s
```yaml
# pod删除时等待优雅关闭的最大时间,单位为秒(超出后强制删除)
podTerminationGracePeriodSeconds: 60
```

## 0.8.0
1. 增加按主机拓扑路径鉴权相关配置
```yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ spec:
annotations:
{{ include "annotations.sha256sum.configmap" ( dict "service" "job-execute" "context" . ) | nindent 8 }}
spec:
{{- include "job.podTerminationGracePeriodSeconds" . | nindent 6 }}
{{- include "job.imagePullSecrets" . | nindent 6 }}
hostAliases: {{- include "common.tplvalues.render" (dict "value" .Values.hostAliases "context" $) | nindent 8 }}
{{- if .Values.executeConfig.affinity }}
Expand Down
2 changes: 1 addition & 1 deletion support-files/kubernetes/charts/bk-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1807,4 +1807,4 @@ assembleConfig:
readTimeout: 300000

# pod删除时等待优雅关闭的最大时间,单位为秒(超出后强制删除)
podTerminationGracePeriodSeconds: 40
podTerminationGracePeriodSeconds: 60
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VALUES_LOG.md文件中需要同步修改。

Loading