Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 执行历史归档任务重调度逻辑不正确 #3364 #3365

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ private void reScheduleTimeoutTasks() {
}
long currentTime = System.currentTimeMillis();
runningTasks.forEach(runningTask -> {
// 如果归档任务没有正常结束,通过当前时间减去任务创建(修改)时间计算执行时长,判断是否超过合理的执行时长
if (currentTime - runningTask.getCreateTime() > TIMEOUT_MILLS ||
currentTime - runningTask.getLastUpdateTime() > TIMEOUT_MILLS) {
// 如果归档任务没有正常结束,通过当前时间减去任务最后修改时间计算执行时长,判断是否超过合理的执行时长
if (currentTime - runningTask.getLastUpdateTime() > TIMEOUT_MILLS) {
log.info("Found timeout archive task, and set archive task status to pending. taskId: {}",
runningTask.buildTaskUniqueId());
// 设置为 pending 状态,会被重新调度
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public class JobInstanceArchiveTaskScheduler implements SmartLifecycle {
*/
private final Map<String, JobInstanceMainDataArchiveTask> scheduledTasks = new ConcurrentHashMap<>();

/**
* 调度器线程挂起 object monitor
*/
private final Object schedulerHangMonitor = new Object();


public JobInstanceArchiveTaskScheduler(ArchiveTaskService archiveTaskService,
JobInstanceHotRecordDAO taskInstanceRecordDAO,
Expand Down Expand Up @@ -118,12 +123,12 @@ public void schedule() {
return;
}
this.scheduling = true;
if (!isActive()) {
log.info("JobInstanceArchiveTaskScheduler is not active, skip");
return;
}

while (true) {
if (!isActive()) {
log.info("JobInstanceArchiveTaskScheduler is not active, skip");
return;
}
StopWatch watch = new StopWatch("archive-task-schedule");
boolean locked = false;
try {
Expand Down Expand Up @@ -166,7 +171,9 @@ public void schedule() {
// 释放锁
jobInstanceArchiveTaskScheduleLock.unlock();
locked = false;
ThreadUtils.sleep(1000 * 60L);
synchronized (schedulerHangMonitor) {
schedulerHangMonitor.wait(1000 * 60L);
}
continue;
}

Expand Down Expand Up @@ -259,6 +266,10 @@ public void stop() {
}
this.active = false;
}
synchronized (schedulerHangMonitor) {
schedulerHangMonitor.notify();
log.info("Try notify scheduler when stopping");
}
stopTasksGraceful();
log.info("JobInstanceArchiveTaskScheduler stop successfully!");
}
Expand All @@ -279,8 +290,8 @@ private void stopTasksGraceful() {
}
try {
if (taskCountDownLatch != null) {
// 等待任务结束,最多等待 10s(等待时间太长进程会被k8s kill掉)
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(10);
// 等待任务结束,最多等待 30s(等待时间太长进程会被k8s kill掉)
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(30);
if (!isAllTaskStopped) {
for (JobInstanceArchiveTask task : scheduledTasks.values()) {
task.forceStopAtOnce();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ spec:
annotations:
{{ include "annotations.sha256sum.configmap" ( dict "service" "job-backup" "context" . ) | nindent 8 }}
spec:
{{- include "job.podTerminationGracePeriodSeconds" . | nindent 6 }}
{{- include "job.imagePullSecrets" . | nindent 6 }}
hostAliases: {{- include "common.tplvalues.render" (dict "value" .Values.hostAliases "context" $) | nindent 8 }}
{{- if .Values.backupConfig.affinity }}
Expand Down Expand Up @@ -104,6 +103,7 @@ spec:
- name: redis
mountPath: /etc/secrets/redis
readOnly: true
terminationGracePeriodSeconds: 80
volumes:
- name: job-storage
persistentVolumeClaim:
Expand Down
Loading