Skip to content

Commit

Permalink
Merge pull request helyOSFramework#74 from helyOSFramework/refactorin…
Browse files Browse the repository at this point in the history
…g/new_reponse_parameter

Refactoring/alternative microservice response parameter for assignment order
  • Loading branch information
cviolbarbosa authored Dec 1, 2024
2 parents 4fcc33d + 5771331 commit 30504a9
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,24 @@ CREATE OR REPLACE FUNCTION public.notify_work_processes_update()
RETURNS trigger AS
$BODY$
BEGIN
PERFORM pg_notify('work_processes_update',
(SELECT row_to_json(r.*)::varchar FROM (
SELECT id, yard_id, work_process_type_id
FROM public.work_processes
WHERE id = NEW.id) r)
);

INSERT INTO public.events_queue (event_name, payload)
VALUES ('work_processes_update',
(SELECT row_to_json(r.*)::text FROM (
SELECT id, yard_id, work_process_type_id, status, work_process_type_name,
agent_ids, on_assignment_failure, tools_uuids, agent_uuids, sched_start_at, fallback_mission
FROM public.work_processes
WHERE id = NEW.id) r)
);

IF NEW.status IS DISTINCT FROM OLD.status THEN
PERFORM pg_notify('work_processes_update',
(SELECT row_to_json(r.*)::varchar FROM (
SELECT id, yard_id, work_process_type_id
FROM public.work_processes
WHERE id = NEW.id) r)
);

INSERT INTO public.events_queue (event_name, payload)
VALUES ('work_processes_update',
(SELECT row_to_json(r.*)::text FROM (
SELECT id, yard_id, work_process_type_id, status, work_process_type_name,
agent_ids, on_assignment_failure, tools_uuids, agent_uuids, sched_start_at, fallback_mission
FROM public.work_processes
WHERE id = NEW.id) r)
);
END IF;

RETURN NULL;
END;
$BODY$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ function processWorkProcessEvents(channel, payload) {
switch (workProcessStatus) {

case MISSION_STATUS.DISPATCHED:
databaseServices.work_processes.update_byId(payload['id'], {status: MISSION_STATUS.PREPARING})
databaseServices.work_processes.updateByConditions({id:payload['id'], status:MISSION_STATUS.DISPATCHED},
{status: MISSION_STATUS.DISPATCHED}
)
.then(() => blMicroservice.prepareServicesPipelineForWorkProcess(payload))
.catch(err => {logData.addLog('helyos_core', {wproc_id: payload['id']},'error', `p ${err.message}`)});
break;
Expand All @@ -97,7 +99,9 @@ function processWorkProcessEvents(channel, payload) {
break;

case MISSION_STATUS.CANCELING:
databaseServices.work_processes.update_byId(payload['id'], {status:MISSION_STATUS.CANCELED})
databaseServices.work_processes.updateByConditions({id:payload['id'], status:MISSION_STATUS.CANCELING},
{status:MISSION_STATUS.CANCELED}
)
.then(() => blAssignm.cancelRequestsToMicroservicesByWPId(payload['id']))
.then(() => blAssignm.cancelWorkProcessAssignments(payload['id']))
.then(() => blAssignm.onWorkProcessEnd(payload['id'], workProcessStatus))
Expand All @@ -106,7 +110,9 @@ function processWorkProcessEvents(channel, payload) {
break;

case MISSION_STATUS.ASSIGNMENT_FAILED:
databaseServices.work_processes.update_byId(payload['id'], {status:MISSION_STATUS.FAILED})
databaseServices.work_processes.updateByConditions({id:payload['id'], status:MISSION_STATUS.ASSIGNMENT_FAILED},
{status:MISSION_STATUS.FAILED}
)
.then(() => blAssignm.cancelRequestsToMicroservicesByWPId(payload['id']))
.then(() => blAssignm.cancelWorkProcessAssignments(payload['id']))
.then(() => blAssignm.onWorkProcessEnd(payload['id'], workProcessStatus))
Expand All @@ -115,15 +121,19 @@ function processWorkProcessEvents(channel, payload) {
break;

case MISSION_STATUS.PLANNING_FAILED:
databaseServices.work_processes.update_byId(payload['id'], {status:MISSION_STATUS.FAILED})
logData.addLog('helyos_core', {wproc_id: payload['id']},'success', `Work process ${payload['id']}: ${workProcessStatus}`);
databaseServices.work_processes.updateByConditions({id:payload['id'], status:MISSION_STATUS.PLANNING_FAILED},
{status:MISSION_STATUS.FAILED}
)
.then(() => blAssignm.cancelRequestsToMicroservicesByWPId(payload['id']))
.then(() => blAssignm.cancelWorkProcessAssignments(payload['id']))
.then(() => blAssignm.onWorkProcessEnd(payload['id'], workProcessStatus))
.catch(err => {logData.addLog('helyos_core', {wproc_id: payload['id']},'error', `work_processes_update ${err.message}`)});
.catch(err => {logData.addLog('helyos_core', {wproc_id: payload['id']},'error', `work_processes_update ${err.message}`)});

break;

case MISSION_STATUS.ASSIGNMENTS_COMPLETED:
logData.addLog('helyos_core', {wproc_id: payload['id']},'success', `Work process ${payload['id']}: ${workProcessStatus}`);
databaseServices.assignments.select({work_process_id:payload['id'], status: ASSIGNMENT_STATUS.CANCELED})
.then( r => {
let w_process_status = r.length? MISSION_STATUS.CANCELED : MISSION_STATUS.SUCCEEDED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async function createAssignment(workProcess, servResponse, serviceRequest){
workProcess.sched_start_at = new Date();
}
const start_stamp = workProcess.sched_start_at.toISOString().replace(/T/, ' ').replace(/\..+/, '');
let assigmentInputs = [], instantActionsInput = [];
let assigmentInputs = [], instantActionsInput = []; assignmentPlan = [];
let dispatch_order = servResponse.dispatch_order;

if (servResponse.results) {
Expand Down Expand Up @@ -68,11 +68,15 @@ async function createAssignment(workProcess, servResponse, serviceRequest){
work_process_id: workProcess.id,
agent_id: agent_id,
service_request_id: serviceRequestId,
status: 'not_ready_to_dispatch',
status: ASSIGNMENT_STATUS.NOT_READY_TO_DISPATCH,
start_time_stamp: start_stamp,
on_assignment_failure: result.on_assignment_failure || result.onAssignmentFailure,
fallback_mission: result.fallback_mission || result.fallbackMission,
data: JSON.stringify(result.result || result.assignment)});
}

if (result.dispatch_order || result.assignment_order) {
assignmentPlan.push(result.dispatch_order || result.assignment_order);

}

Expand All @@ -90,7 +94,7 @@ async function createAssignment(workProcess, servResponse, serviceRequest){
work_process_id: workProcess.id,
agent_id: agentId,
service_request_id: serviceRequestId,
status: 'not_ready_to_dispatch',
status: ASSIGNMENT_STATUS.NOT_READY_TO_DISPATCH,
start_time_stamp: start_stamp,
data: data}
));
Expand All @@ -102,42 +106,90 @@ async function createAssignment(workProcess, servResponse, serviceRequest){

// create assignments
const insertPromises = assigmentInputs.map( input => databaseServices.assignments.insert(input).then( newId => newId));

// update assginments; sorting them.
let updatePromises = [];
let assignment_orchestration = '';
if (dispatch_order && dispatch_order.length) assignment_orchestration = 'dispatch_order_array';
else if (assignmentPlan && assignmentPlan.length) assignment_orchestration = 'assignment_order_array';
else assignment_orchestration = 'dispatch_all_at_once';

// validating assigment plans



return Promise.all(insertPromises)
.then((insertedPromiseIds)=>{
// ordering assignment dispatches
if (dispatch_order && dispatch_order.length) {
if (dispatch_order.length === 1 ) dispatch_order.push([]); // special case: there is no dependent assignments.
for (let order = dispatch_order.length-1; order > 0; order--) {
const assgmtArrayIdxs = dispatch_order[order];
const previousArrayIdxs = dispatch_order[order-1];
const assgmtDBIdxs = assgmtArrayIdxs.map(i => parseInt(insertedPromiseIds[i]) );
const previousDBIdxs = previousArrayIdxs.map(i => parseInt(insertedPromiseIds[i]));
const statusPrecedent = (order-1) === 0 ? ASSIGNMENT_STATUS.TO_DISPATCH : 'not_ready_to_dispatch';
const updatePrecedentAssignments = previousDBIdxs.map(id => databaseServices.assignments.update_byId(id,{'next_assignments': assgmtDBIdxs,
'status': statusPrecedent }
));
const updateDependentAssignments = assgmtDBIdxs.map(id => databaseServices.assignments.update_byId(id,{'depend_on_assignments':previousDBIdxs}));
updatePromises = updatePromises.concat([...updateDependentAssignments, ...updatePrecedentAssignments]);
}
} else {
updatePromises = insertedPromiseIds.map(id => databaseServices.assignments.update_byId(id,{status: ASSIGNMENT_STATUS.TO_DISPATCH}));

if (assignment_orchestration === 'dispatch_all_at_once') {
updatePromises = insertedPromiseIds.map(id => databaseServices.assignments.update_byId(id,{status: ASSIGNMENT_STATUS.TO_DISPATCH}));
}

if (assignment_orchestration === 'dispatch_order_array') {
updatePromises = insertedPromiseIds.map(id => databaseServices.assignments.update_byId(id,{status: ASSIGNMENT_STATUS.TO_DISPATCH}));
if (dispatch_order.length === 1 ) dispatch_order.push([]); // special case: there is no dependent assignments.
for (let order = dispatch_order.length-1; order > 0; order--) {
const assgmtArrayIdxs = dispatch_order[order];
const previousArrayIdxs = dispatch_order[order-1];
const assgmtDBIdxs = assgmtArrayIdxs.map(i => parseInt(insertedPromiseIds[i]) );
const previousDBIdxs = previousArrayIdxs.map(i => parseInt(insertedPromiseIds[i]));
const statusPrecedent = (order-1) === 0 ? ASSIGNMENT_STATUS.TO_DISPATCH : ASSIGNMENT_STATUS.NOT_READY_TO_DISPATCH;
const updatePrecedentAssignments = previousDBIdxs.map(id => databaseServices.assignments.update_byId(id,{'next_assignments': assgmtDBIdxs,
'status': statusPrecedent }
));
const updateDependentAssignments = assgmtDBIdxs.map(id => databaseServices.assignments.update_byId(id,{'depend_on_assignments':previousDBIdxs}));
updatePromises = updatePromises.concat([...updateDependentAssignments, ...updatePrecedentAssignments]);
}
}

if (assignment_orchestration === 'assignment_order_array') {
const dispatchGroup = {};
// Group assignments
assignmentPlan.forEach((order, index) => {
if (!dispatchGroup[order]) dispatchGroup[order] = [insertedPromiseIds[index]];
else dispatchGroup[order].push(insertedPromiseIds[index]);
});

assignmentPlan.forEach(order => {
const ids = dispatchGroup[order];
const nextIds = dispatchGroup[order + 1];
const prevIds = dispatchGroup[order - 1];

const nextAssignments = nextIds ? nextIds : [];
const dependOnAssignments = prevIds ? prevIds : [];

const updateDependencies = ids.map( id => databaseServices.assignments.update_byId(id, {
'next_assignments': nextAssignments,
'depend_on_assignments': dependOnAssignments
}));
updatePromises = updatePromises.concat([...updateDependencies]);
});

let dispatchTriggerPromises = [];
if (dispatchGroup[1]) {
dispatchTriggerPromises = dispatchGroup[1].map(id =>
databaseServices.assignments.update_byId(id, { 'status': ASSIGNMENT_STATUS.TO_DISPATCH })
);
}
updatePromises = [Promise.all(updatePromises).then(()=> Promise.all(dispatchTriggerPromises))];

}



return Promise.all(updatePromises)
.then(()=>{
const statusUpdatePromises = assigmentInputs.map( input =>
databaseServices.service_requests.update_byId(serviceRequest.id, {assignment_dispatched: true})
.then(() => databaseServices.agents.update('id', input.agentId, {status:AGENT_STATUS.BUSY}))
.then(() => databaseServices.work_processes.updateByConditions(
{'id': workProcess.id, 'status__in': [ MISSION_STATUS.DISPATCHED,
MISSION_STATUS.CALCULATING,
MISSION_STATUS.PREPARING]},
{status: MISSION_STATUS.EXECUTING}))
);
return Promise.all(statusUpdatePromises);
})
return Promise.all(updatePromises)
.then(()=>{
const statusUpdatePromises = assigmentInputs.map( input =>
databaseServices.service_requests.update_byId(serviceRequest.id, {assignment_dispatched: true})
.then(() => databaseServices.agents.update('id', input.agentId, {status:AGENT_STATUS.BUSY})) // COMMENT: This should not be necessary here.
.then(() => databaseServices.work_processes.updateByConditions(
{'id': workProcess.id, 'status__in': [ MISSION_STATUS.DISPATCHED,
MISSION_STATUS.CALCULATING,
MISSION_STATUS.PREPARING]},
{status: MISSION_STATUS.EXECUTING}))
);
return Promise.all(statusUpdatePromises);
})

})
.catch(err => logData.addLog('helyos_core', null, 'error', `createAssignment ${err.message}`));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
const extServCommunication = require('../modules/communication/microservice_communication');
const databaseServices = require('../services/database/database_services.js');
const { logData} = require('../modules/systemlog');
const { SERVICE_STATUS } = require('../modules/data_models');
const { SERVICE_STATUS, ASSIGNMENT_STATUS } = require('../modules/data_models');
const { determineServiceRequestStatus } = require('../modules/microservice_orchestration');


Expand Down Expand Up @@ -128,7 +128,7 @@ const waitForAssigmentsDependencies =() => {
// when first assighment get ready it changes the status of dependents to "wait_dependencies"
// here we just wait to switch wait_dependencies => to_dispatch

return databaseServices.assignments.select({status:'wait_dependencies'})
return databaseServices.assignments.select({status: ASSIGNMENT_STATUS.WAIT_DEPENDENCIES})
.then((allAwaitingAssignments) => {
const promises = allAwaitingAssignments.map(
async (assignment) => {
Expand Down
3 changes: 2 additions & 1 deletion helyos_server/src/modules/assignment_orchestration.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ async function assignmentUpdatesMissionStatus(id, wprocId) {
await databaseServices.work_processes.get_byId(wprocId, ['status'])
.then( wproc => {
if (UNCOMPLETE_MISSION_STATUS.includes(wproc.status)) {
return databaseServices.work_processes.update_byId(wprocId, { status: MISSION_STATUS.ASSIGNMENTS_COMPLETED});
return databaseServices.work_processes.updateByConditions({id:wprocId, status__in: UNCOMPLETE_MISSION_STATUS},
{ status: MISSION_STATUS.ASSIGNMENTS_COMPLETED});
}
});
}
Expand Down
69 changes: 69 additions & 0 deletions tests/18-two-consecutive-assignments-mission.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// What is being tested:
// Same than test 01 but the mission will failed because microservice produces wrong
// assignment format.


process.env.TEST_NUMBER = 18;
console.log("starting test 18");


let helyosApplication;

describe('18 Two Consecutive Assignments', () => {

it('Agent is reserved', async () => {
helyosApplication = await getHelyOSClientInstance();
await helyosApplication.createNewMission('driving_release', ['Ab34069fc5-fdgs-434b-b87e-f19c5435113',
'Bb34069fc5-fdgs-434b-b87e-f19c5435113']);
const result = await helyosApplication.waitAgentStatus(1, 'ready');
expect(result).toEqual('ready');
});

it('Microservice is ready', async () => {
const result = await helyosApplication.waitMicroserviceStatus(1, 'ready');
expect(result).toEqual('ready');
});

it('One Assignment starts and other waits', async () => {
const result1 = await helyosApplication.waitAssignmentStatuses(1, ['executing', 'not_ready_to_dispatch']);
const result2 = await helyosApplication.waitAssignmentStatuses(2, ['executing', 'not_ready_to_dispatch']);
const result = result1 === 'executing' && result2 === 'not_ready_to_dispatch' || result2 === 'executing' && result1 === 'not_ready_to_dispatch';
expect(result).toEqual(true);
});

it('One assignment is completed and the other runs', async () => {
const result1 = await helyosApplication.waitAssignmentStatuses(1, ['completed', 'executing']);
const result2 = await helyosApplication.waitAssignmentStatuses(2, ['completed', 'executing']);
const result = result1 === 'executing' && result2 === 'completed' || result2 === 'executing' && result1 === 'completed';
expect(result).toEqual(true);

});


it(' 2 Assignments are completed', async () => {
const result1 = await helyosApplication.waitAssignmentStatus(1, 'completed');
const result2 = await helyosApplication.waitAssignmentStatus(2, 'completed');
expect(result1).toEqual('completed');
expect(result2).toEqual('completed');
});


it('Agent is free', async () => {
const result1 = await helyosApplication.waitAgentStatus(1, 'free');
const result2 = await helyosApplication.waitAgentStatus(2, 'free');
expect(result1).toEqual('free');
expect(result2).toEqual('free');
});

it('Check 1 x Reserves/ 1 x Releases sent to the agent 1', async () => {
const logs = await helyosApplication.getAgentRelatedLogs('Ab34069fc5-fdgs-434b-b87e-f19c5435113');
const reservationsSent = logs.filter(log => log.msg.includes('Sending reserve')).length;
const cancelationsSent = logs.filter(log => log.msg.includes('Sending cancel')).length;
const releasesSent = logs.filter(log => log.msg.includes('Sending release')).length;

expect(reservationsSent).toEqual(1);
expect(cancelationsSent).toEqual(0);
expect(releasesSent).toEqual(1);
});

});
Loading

0 comments on commit 30504a9

Please sign in to comment.