Skip to content

Commit

Permalink
common: add history for cost models and set view table for latest mod…
Browse files Browse the repository at this point in the history
…els.

Indexer service and tap agent require a history of cost models for tap
cost model checks and accept new and old one for 2-3 mins
  • Loading branch information
carlosvdr committed Sep 17, 2024
1 parent bb2f3aa commit 153c27f
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 33 deletions.
150 changes: 150 additions & 0 deletions packages/indexer-agent/src/db/migrations/14-cost-models-history.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { Logger } from '@graphprotocol/common-ts'
import { utils } from 'ethers'
import { QueryInterface, DataTypes } from 'sequelize'

interface MigrationContext {
queryInterface: QueryInterface
logger: Logger
}

interface Context {
context: MigrationContext
}
export const COST_MODEL_GLOBAL = 'global'
export async function up({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

const tables = await queryInterface.showAllTables()
logger.debug(`Checking if CostModelsHistory table exists`, { tables })

// CostModelsHistory: this table will store the history of cost models
// this is necessary since there could be a mismtach between the old table and the info the gateway has
// causing a failed request. Solution is to have a history and allow the "old" model for a limited time frame 2-3 mins
// For indexer-service is also helpful to have the history of the cost models since we want to obtain the minimum cost per receipt
// this will help since the gateway could send an old model and get blocked so we need the indexer to accept one of the 2 latest models
// in the past 30 seconds since the gateway updates its model every 30 seconds

if (tables.includes('CostModelsHistory')) {
logger.debug(`CostModelsHistory already exist, migration not necessary`)
} else {
logger.info(`Create CostModelsHistory`)
await queryInterface.createTable('CostModelsHistory', {
id: {
type: DataTypes.BIGINT,
primaryKey: true,
autoIncrement: true,
unique: true,
},
deployment: {
type: DataTypes.STRING,
allowNull: false,
validate: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
isDeploymentID: (value: any) => {
if (typeof value !== 'string') {
throw new Error('Deployment ID must be a string')
}
// "0x..." and "global" is ok
if (utils.isHexString(value, 32) || value === COST_MODEL_GLOBAL) {
return
}

throw new Error(
`Deployment ID must be a valid subgraph deployment ID or "global"`,
)
},
},
},
model: {
type: DataTypes.TEXT,
allowNull: true,
},
variables: {
type: DataTypes.JSONB,
allowNull: true,
},
createdAt: {
type: DataTypes.DATE,
allowNull: false,
},
updatedAt: {
type: DataTypes.DATE,
allowNull: false,
},
})
if (tables.includes('CostModels')) {
logger.debug(`Copying data from CostModels into CostModelsHistory`)
const copyTableSQL = `
INSERT INTO "CostModelsHistory"
SELECT * FROM "CostModels";
`
await queryInterface.sequelize.query(copyTableSQL)
logger.info(`Drop table "CostModels"`)
await queryInterface.dropTable('CostModels', { cascade: true })
}
// To avoid creating a breaking change for the indexer-agent or indexer-service we create a view table
// Since now other systems can keep the same query towards "CostModels" and not need to change anything

logger.info(
`Creating a view for CostModelsHistory to substitute "CostModels" table`,
)
const viewSQL = `
CREATE VIEW "CostModels" AS SELECT id,
deployment,
model,
variables,
"createdAt",
"updatedAt"
FROM "CostModelsHistory" t1
JOIN
(
SELECT MAX(id)
FROM "CostModelsHistory"
GROUP BY deployment
) t2
ON t1.id = t2.MAX;
`
// We also need to create a trigger to notify indexer-service when a new cost model is added
// instead of it polling the db
await queryInterface.sequelize.query(viewSQL)

const functionSQL = `
CREATE FUNCTION cost_models_update_notify()
RETURNS trigger AS
$$
BEGIN
IF TG_OP = 'DELETE' THEN
PERFORM pg_notify('cost_models_update_notification', format('{"tg_op": "DELETE", "deployment": "%s"}', OLD.deployment));
RETURN OLD;
ELSIF TG_OP = 'INSERT' THEN
PERFORM pg_notify('cost_models_update_notification', format('{"tg_op": "INSERT", "deployment": "%s"}', NEW.deployment));
RETURN NEW;
ELSE -- UPDATE OR TRUNCATE, should never happen
PERFORM pg_notify('cost_models_update_notification', format('{"tg_op": "%s", "deployment": null}', TG_OP, NEW.deployment));
RETURN NEW;
END IF;
END;
$$ LANGUAGE 'plpgsql';
`
const triggerSQL = `
CREATE TRIGGER cost_models_update AFTER INSERT OR UPDATE OR DELETE
ON "CostModelsHistory"
FOR EACH ROW EXECUTE PROCEDURE cost_models_update_notify();
`
await queryInterface.sequelize.query(functionSQL)
await queryInterface.sequelize.query(triggerSQL)
// Need to update sequence value for table else it will be unsynced with actual data
logger.info(`Update sequence for CostModelsHistory`)
const updateIdSeqSQL = `SELECT setval('"CostModelsHistory_id_seq"', (SELECT MAX(id) FROM "CostModelsHistory"));`
await queryInterface.sequelize.query(updateIdSeqSQL)
}
}

export async function down({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

logger.info(`Drop view "CostModels"`)
await queryInterface.sequelize.query('DROP VIEW IF EXISTS "CostModels"')
logger.info(`Drop table CostModelsHistory`)
await queryInterface.dropTable('CostModelsHistory')
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ export const defineCostModelModels = (sequelize: Sequelize): CostModelModels =>
CostModel.init(
{
id: {
type: DataTypes.INTEGER,
type: DataTypes.BIGINT,
autoIncrement: true,
primaryKey: true,
unique: true,
},
deployment: {
type: DataTypes.STRING,
allowNull: false,
primaryKey: true,
validate: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
isDeploymentID: (value: any) => {
Expand Down Expand Up @@ -122,7 +122,8 @@ export const defineCostModelModels = (sequelize: Sequelize): CostModelModels =>
},
},
{
modelName: 'CostModel',
modelName: 'CostModelsHistory',
freezeTableName: true,
sequelize,
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ export default {
): Promise<object | null> => {
const model = await models.CostModel.findOne({
where: { deployment },
order: [['id', 'DESC']],
})
if (model) {
return model.toGraphQL()
}

const globalModel = await models.CostModel.findOne({
where: { deployment: COST_MODEL_GLOBAL },
order: [['id', 'DESC']],
})
if (globalModel) {
globalModel.setDataValue('deployment', deployment)
Expand All @@ -73,14 +75,37 @@ export default {
{ deployments }: { deployments: string[] | null | undefined },
{ models }: IndexerManagementResolverContext,
): Promise<object[]> => {
const costModels = await models.CostModel.findAll({
where: deployments ? { deployment: deployments } : undefined,
order: [['deployment', 'ASC']],
const sequelize = models.CostModel.sequelize
if (!sequelize) {
throw new Error('No sequelize instance available')
}
const query = `
SELECT id,
deployment,
model,
variables,
"createdAt",
"updatedAt"
FROM "CostModelsHistory" t1
JOIN
(
SELECT MAX(id)
FROM "CostModelsHistory"
${deployments ? 'WHERE deployment IN (:deployments)' : ''}
GROUP BY deployment
) t2
ON t1.id = t2.MAX;
`
const costModels = await sequelize.query(query, {
replacements: { deployments: deployments ? deployments : [] },
mapToModel: true,
model: models.CostModel,
})
const definedDeployments = new Set(costModels.map((model) => model.deployment))
const undefinedDeployments = deployments?.filter((d) => !definedDeployments.has(d))
const globalModel = await models.CostModel.findOne({
where: { deployment: COST_MODEL_GLOBAL },
order: [['id', 'DESC']],
})
if (globalModel && undefinedDeployments) {
const mergedCostModels = undefinedDeployments.map((d) => {
Expand Down Expand Up @@ -110,7 +135,6 @@ export default {
`Can't set cost model: DAI injection enabled but not on Ethereum Mainnet`,
)
}

const update = parseGraphQLCostModel(costModel)

// Validate cost model
Expand All @@ -121,37 +145,44 @@ export default {
} catch (err) {
throw new Error(`Invalid cost model or variables: ${err.message}`)
}
const [model] = await models.CostModel.findOrBuild({
const oldModel = await models.CostModel.findOne({
where: { deployment: update.deployment },
order: [['id', 'DESC']],
})
// logger.info('Fetched current model', { current: model, update })
// model.set('deployment', update.deployment || model.deployment)
// // model.set('model', update.model || model.model)
// model.model = update.model || model.model
// logger.info('Merged models', { now: model })
model.deployment = update.deployment || model.deployment
model.model = update.model || model.model

// Update the model variables (fall back to current value if unchanged)
let variables = update.variables || model.variables

if (injectDai) {
const oldDai = getVariable(model.variables, 'DAI')
const newDai = getVariable(update.variables, 'DAI')

// Inject the latest DAI value if available
if (dai.valueReady) {
variables = setVariable(variables, 'DAI', await dai.value())
} else if (newDai === undefined && oldDai !== undefined) {
// Otherwise preserve the old DAI value if there is one;
// this ensures it's never dropped
variables = setVariable(variables, 'DAI', oldDai)
const model = models.CostModel.build({
deployment: update.deployment,
model: update.model || oldModel?.model,
variables: update.variables || oldModel?.variables,
})
if (oldModel) {
let variables = update.variables || oldModel!.variables
if (injectDai) {
const oldDai = getVariable(oldModel!.variables, 'DAI')
const newDai = getVariable(update.variables, 'DAI')

// Inject the latest DAI value if available
if (dai.valueReady) {
variables = setVariable(variables, 'DAI', await dai.value())
} else if (newDai === undefined && oldDai !== undefined) {
// Otherwise preserve the old DAI value if there is one;
// this ensures it's never dropped
variables = setVariable(variables, 'DAI', oldDai)
}
// Apply new variables
model.variables = variables
}
} else {
let variables = update.variables
if (injectDai) {
// Inject the latest DAI value if available
if (dai.valueReady) {
variables = setVariable(variables, 'DAI', await dai.value())
}
// Apply new variables
model.variables = variables
}
}

// Apply new variables
model.variables = variables

return (await model.save()).toGraphQL()
},

Expand Down

0 comments on commit 153c27f

Please sign in to comment.