Skip to content

Commit

Permalink
Make the list RSEs pipeline element discard results with errors and c…
Browse files Browse the repository at this point in the history
…over this by a test
  • Loading branch information
MytsV committed Aug 29, 2024
1 parent 4357a1d commit 7eb1203
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import { BaseStreamingPostProcessingPipelineElement } from "@/lib/sdk/postprocessing-pipeline-elements";
import { AuthenticatedRequestModel } from "@/lib/sdk/usecase-models";
import {BaseStreamingPostProcessingPipelineElement} from "@/lib/sdk/postprocessing-pipeline-elements";
import {AuthenticatedRequestModel} from "@/lib/sdk/usecase-models";

import { RSEDTO, getEmptyRSEDTO } from "@/lib/core/dto/rse-dto";
import {RSEDTO, getEmptyRSEDTO} from "@/lib/core/dto/rse-dto";
import RSEGatewayOutputPort from "@/lib/core/port/secondary/rse-gateway-output-port";
import { ListRSEsError, ListRSEsRequest, ListRSEsResponse } from "@/lib/core/usecase-models/list-rses-usecase-models";
import {ListRSEsError, ListRSEsRequest, ListRSEsResponse} from "@/lib/core/usecase-models/list-rses-usecase-models";


export default class GetRSEPipelineElement extends BaseStreamingPostProcessingPipelineElement<ListRSEsRequest, ListRSEsResponse, ListRSEsError, RSEDTO>{
export default class GetRSEPipelineElement extends BaseStreamingPostProcessingPipelineElement<ListRSEsRequest, ListRSEsResponse, ListRSEsError, RSEDTO> {
constructor(private gateway: RSEGatewayOutputPort) {
super();
}

async makeGatewayRequest(requestModel: AuthenticatedRequestModel<ListRSEsRequest>, responseModel: ListRSEsResponse): Promise<RSEDTO> {
try {
const { rucioAuthToken } = requestModel;
const {rucioAuthToken} = requestModel;
const rseName = responseModel.name;
if(!rseName) {
if (!rseName) {
const errorDTO: RSEDTO = getEmptyRSEDTO();
errorDTO.status = 'error';
errorDTO.errorCode = 400;
Expand All @@ -38,13 +39,19 @@ export default class GetRSEPipelineElement extends BaseStreamingPostProcessingPi
const errorModel: ListRSEsError = {
status: 'error',
name: dto.name,
code: dto.errorCode? dto.errorCode : 400,
message: dto.errorName + ': ' + dto.errorMessage
code: dto.errorCode ? dto.errorCode : 400,
message: dto.errorName + ': ' + dto.errorMessage
}
return errorModel;
}

transformResponseModel(responseModel: ListRSEsResponse, dto: RSEDTO): ListRSEsResponse {

transformResponseModel(responseModel: ListRSEsResponse, dto: RSEDTO): ListRSEsResponse | ListRSEsError {
if (dto.status === 'error') return {
status: 'error',
message: '',
name: '',
code: 500,
};
responseModel.id = dto.id;
responseModel.deterministic = dto.deterministic;
responseModel.rse_type = dto.rse_type;
Expand Down
202 changes: 134 additions & 68 deletions test/api/rse/list-rses.test.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,91 @@
import { RSEType } from '@/lib/core/entity/rucio'
import { ListRSEsRequest } from '@/lib/core/usecase-models/list-rses-usecase-models'
import { ListRSEsControllerParameters } from '@/lib/infrastructure/controller/list-rses-controller'
import {RSEType} from '@/lib/core/entity/rucio'
import {ListRSEsRequest} from '@/lib/core/usecase-models/list-rses-usecase-models'
import {ListRSEsControllerParameters} from '@/lib/infrastructure/controller/list-rses-controller'
import appContainer from '@/lib/infrastructure/ioc/container-config'
import CONTROLLERS from '@/lib/infrastructure/ioc/ioc-symbols-controllers'
import { BaseController } from '@/lib/sdk/controller'
import { NextApiResponse } from 'next'
import { Readable } from 'stream'
import { MockHttpStreamableResponseFactory } from 'test/fixtures/http-fixtures'
import {BaseController} from '@/lib/sdk/controller'
import {NextApiResponse} from 'next'
import {Readable} from 'stream'
import {MockHttpStreamableResponseFactory} from 'test/fixtures/http-fixtures'
import MockRucioServerFactory, {
MockEndpoint,
} from 'test/fixtures/rucio-server'

describe('List RSEs Feature tests', () => {
beforeEach(() => {
fetchMock.doMock()
const rseStream = Readable.from([JSON.stringify({ rse: 'XRD1' })].join('\n'))

const xrd1 = JSON.stringify({
availability_delete: true,
availability_read: true,
availability_write: true,
deterministic: true,
domain: ['lan', 'wan'],
id: '464cd7656db842c78261bcc2a087bbcb',
lfn2pfn_algorithm: 'hash',
protocols: [
{
hostname: 'xrd1',
scheme: 'root',
port: 1094,
prefix: '//rucio',
impl: 'rucio.rse.protocols.xrootd.Default',
domains: {
lan: {
read: 1,
write: 1,
delete: 1,
},
wan: {
read: 1,
write: 1,
delete: 1,
third_party_copy_read: 1,
third_party_copy_write: 1,
},
const rseStream = Readable.from([JSON.stringify({rse: 'XRD1'})].join('\n'))

const xrd1 = JSON.stringify({
availability_delete: true,
availability_read: true,
availability_write: true,
deterministic: true,
domain: ['lan', 'wan'],
id: '464cd7656db842c78261bcc2a087bbcb',
lfn2pfn_algorithm: 'hash',
protocols: [
{
hostname: 'xrd1',
scheme: 'root',
port: 1094,
prefix: '//rucio',
impl: 'rucio.rse.protocols.xrootd.Default',
domains: {
lan: {
read: 1,
write: 1,
delete: 1,
},
wan: {
read: 1,
write: 1,
delete: 1,
third_party_copy_read: 1,
third_party_copy_write: 1,
},
extended_attributes: null,
},
],
qos_class: null,
rse: 'XRD1',
rse_type: 'DISK',
sign_url: null,
staging_area: false,
verify_checksum: true,
volatile: false,
read_protocol: 1,
write_protocol: 1,
delete_protocol: 1,
third_party_copy_read_protocol: 1,
third_party_copy_write_protocol: 1,
availability: 7,
})
extended_attributes: null,
},
],
qos_class: null,
rse: 'XRD1',
rse_type: 'DISK',
sign_url: null,
staging_area: false,
verify_checksum: true,
volatile: false,
read_protocol: 1,
write_protocol: 1,
delete_protocol: 1,
third_party_copy_read_protocol: 1,
third_party_copy_write_protocol: 1,
availability: 7,
})

beforeEach(() => {
fetchMock.doMock()
})

afterEach(() => {
fetchMock.dontMock()
})

it('should return with INVALID RSE EXPRESSION error if rseExpression is undefined', async () => {
const res = MockHttpStreamableResponseFactory.getMockResponse()
const controller = appContainer.get<
BaseController<ListRSEsControllerParameters, ListRSEsRequest>
>(CONTROLLERS.LIST_RSES)
const controllerParams: ListRSEsControllerParameters = {
rucioAuthToken: MockRucioServerFactory.VALID_RUCIO_TOKEN,
response: res as unknown as NextApiResponse,
rseExpression: '',
}

await controller.execute(controllerParams)
const data = await res._getJSONData()
expect(data.status).toBe('error')
expect(data.message).toBe('RSE Expression is undefined or an empty string')
})
it('should list rses as a stream', async () => {
const listRSEsEndpoint: MockEndpoint = {
url: `${MockRucioServerFactory.RUCIO_HOST}/rses/?expression=XRD1`,
method: 'GET',
Expand All @@ -87,29 +111,70 @@ describe('List RSEs Feature tests', () => {
},
}
MockRucioServerFactory.createMockRucioServer(true, [listRSEsEndpoint, getXRD1Endpoint])
})

afterEach(() => {
fetchMock.dontMock()
})

it('should return with INVALID RSE EXPRESSION error if rseExpression is undefined', async () => {
const res = MockHttpStreamableResponseFactory.getMockResponse()

const controller = appContainer.get<
BaseController<ListRSEsControllerParameters, ListRSEsRequest>
>(CONTROLLERS.LIST_RSES)
const controllerParams: ListRSEsControllerParameters = {
rucioAuthToken: MockRucioServerFactory.VALID_RUCIO_TOKEN,
response: res as unknown as NextApiResponse,
rseExpression: '',
rseExpression: 'XRD1',
}

await controller.execute(controllerParams)
const data = await res._getJSONData()
expect(data.status).toBe('error')
expect(data.message).toBe('RSE Expression is undefined or an empty string')

const receivedData: any[] = []
const onData = (data: any) => {
receivedData.push(JSON.parse(data))
}

const done = new Promise<void>((resolve, reject) => {
res.on('data', onData)
res.on('end', () => {
res.off('data', onData)
resolve()
})
res.on('error', err => {
res.off('data', onData)
reject(err)
})
})

await done

expect(receivedData.length).toEqual(1)
expect(receivedData[0].id).toEqual('464cd7656db842c78261bcc2a087bbcb')
expect(receivedData[0].rse_type).toEqual(RSEType.DISK)
})
it('should list rses as a stream', async () => {
it('should return an error for RSEs which status cannot be fetched', async () => {
const listRSEsEndpoint: MockEndpoint = {
url: `${MockRucioServerFactory.RUCIO_HOST}/rses/?expression=*`,
method: 'GET',
includes: 'rses?expression=*',
response: {
status: 200,
headers: {
'Content-Type': 'application/x-json-stream',
},
body: Readable.from([JSON.stringify({rse: 'XRD1'}), JSON.stringify({rse: 'XRD2'})].join('\n')),
},
}
const getXRD1Endpoint: MockEndpoint = {
url: `${MockRucioServerFactory.RUCIO_HOST}/rses/XRD1`,
method: 'GET',
includes: 'rses/XRD1',
response: {
status: 200,
headers: {
'Content-Type': 'application/json',
},
body: xrd1,
},
}
MockRucioServerFactory.createMockRucioServer(true, [listRSEsEndpoint, getXRD1Endpoint])

const res = MockHttpStreamableResponseFactory.getMockResponse()

const controller = appContainer.get<
Expand All @@ -118,7 +183,7 @@ describe('List RSEs Feature tests', () => {
const controllerParams: ListRSEsControllerParameters = {
rucioAuthToken: MockRucioServerFactory.VALID_RUCIO_TOKEN,
response: res as unknown as NextApiResponse,
rseExpression: 'XRD1',
rseExpression: '*',
}

await controller.execute(controllerParams)
Expand All @@ -142,8 +207,9 @@ describe('List RSEs Feature tests', () => {

await done

expect(receivedData.length).toEqual(1)
expect(receivedData.length).toEqual(2)
expect(receivedData[0].id).toEqual('464cd7656db842c78261bcc2a087bbcb')
expect(receivedData[0].rse_type).toEqual(RSEType.DISK)
expect(receivedData[1].status).toEqual('error')
})
})

0 comments on commit 7eb1203

Please sign in to comment.