diff --git a/src/@types/DDO/DDO.ts b/src/@types/DDO/DDO.ts index 5ca15a406..42bf5c0a8 100644 --- a/src/@types/DDO/DDO.ts +++ b/src/@types/DDO/DDO.ts @@ -1,8 +1,7 @@ import { Service } from './Service' import { Metadata } from './Metadata' import { Credentials } from './Credentials' -import { Event } from './Event' -import { Nft } from './Nft' +import { IndexedMetadata } from './IndexedMetadata' /** * DID Descriptor Object. @@ -60,11 +59,5 @@ export interface DDO { */ credentials?: Credentials - /** - * Describes the event of last metadata event - * @type {Event} - */ - event?: Event - - nft?: Nft + indexedMetadata?: IndexedMetadata } diff --git a/src/@types/DDO/IndexedMetadata.ts b/src/@types/DDO/IndexedMetadata.ts new file mode 100644 index 000000000..1fdaa94e0 --- /dev/null +++ b/src/@types/DDO/IndexedMetadata.ts @@ -0,0 +1,31 @@ +import { Event } from './Event' +import { Nft } from './Nft' + +export type PriceType = 'fixedrate' | 'dispenser' + +export interface ServicePrice { + type: PriceType + price: string + contract: string + token?: string + exchangeId?: string +} + +export interface ServiceStats { + datatokenAddress: string + name: string + symbol: string + serviceId: string + orders?: number + prices?: ServicePrice[] +} + +export interface IndexedMetadata { + nft: Nft + stats?: ServiceStats[] + /** + * Describes the event of last metadata event + * @type {Event} + */ + event?: Event +} diff --git a/src/components/Indexer/index.ts b/src/components/Indexer/index.ts index 6e83595e4..9b24f543b 100644 --- a/src/components/Indexer/index.ts +++ b/src/components/Indexer/index.ts @@ -219,7 +219,12 @@ export class OceanIndexer { EVENTS.METADATA_UPDATED, EVENTS.METADATA_STATE, EVENTS.ORDER_STARTED, - EVENTS.ORDER_REUSED + EVENTS.ORDER_REUSED, + EVENTS.DISPENSER_ACTIVATED, + EVENTS.DISPENSER_DEACTIVATED, + EVENTS.EXCHANGE_ACTIVATED, + EVENTS.EXCHANGE_DEACTIVATED, + EVENTS.EXCHANGE_RATE_CHANGED ].includes(event.method) ) { // will emit the metadata created/updated event and advertise it to the other peers (on create only) diff --git a/src/components/Indexer/processor.ts b/src/components/Indexer/processor.ts index cbf2d4b72..455ecdaa9 100644 --- a/src/components/Indexer/processor.ts +++ b/src/components/Indexer/processor.ts @@ -2,6 +2,7 @@ import { Interface, JsonRpcApiProvider, Signer, + ZeroAddress, ethers, getAddress, getBytes, @@ -16,18 +17,34 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC721Template.sol/ERC721Template.json' assert { type: 'json' } import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' } +import Dispenser from '@oceanprotocol/contracts/artifacts/contracts/pools/dispenser/Dispenser.sol/Dispenser.json' assert { type: 'json' } +import FixedRateExchange from '@oceanprotocol/contracts/artifacts/contracts/pools/fixedRate/FixedRateExchange.sol/FixedRateExchange.json' assert { type: 'json' } import AccessListContract from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' assert { type: 'json' } import { getDatabase } from '../../utils/database.js' import { PROTOCOL_COMMANDS, EVENTS, MetadataStates, + EVENT_HASHES, ENVIRONMENT_VARIABLES } from '../../utils/constants.js' -import { getDtContract, wasNFTDeployedByOurFactory } from './utils.js' +import { + findServiceIdByDatatoken, + getDtContract, + getPricingStatsForDddo, + wasNFTDeployedByOurFactory, + getPricesByDt, + doesDispenserAlreadyExist, + doesFreAlreadyExist +} from './utils.js' + import { INDEXER_LOGGER } from '../../utils/logging/common.js' import { Purgatory } from './purgatory.js' -import { getConfiguration, timestampToDateTime } from '../../utils/index.js' +import { + deleteIndexedMetadataIfExists, + getConfiguration, + timestampToDateTime +} from '../../utils/index.js' import { OceanNode } from '../../OceanNode.js' import { asyncCallWithTimeout, streamToString } from '../../utils/util.js' import { DecryptDDOCommand } from '../../@types/commands.js' @@ -42,30 +59,92 @@ class BaseEventProcessor { this.networkId = chainId } - protected getTokenInfo(services: any[]): any[] { + protected isValidDtAddressFromServices(services: any[]): boolean { + for (const service of services) { + if ( + service.datatokenAddress === '0x0' || + service.datatokenAddress === ZeroAddress + ) { + return false + } + } + return true + } + + protected async getTokenInfo(services: any[], signer: Signer): Promise { const datatokens: any[] = [] - services.forEach((service) => { + + for (const service of services) { + const datatoken = new ethers.Contract( + service.datatokenAddress, + ERC20Template.abi, + signer + ) + let name: string + let symbol: string + if ( + service.datatokenAddress === '0x0' || + service.datatokenAddress === ZeroAddress + ) { + name = `Datatoken${services.indexOf(service)}` + symbol = `DT${services.indexOf(service)}` + } else { + name = await datatoken.name() + INDEXER_LOGGER.logMessage(`name.datatoken: ${name}`) + symbol = await datatoken.symbol() + INDEXER_LOGGER.logMessage(`symbol.datatoken: ${symbol}`) + } + datatokens.push({ address: service.datatokenAddress, - name: 'Datatoken', - symbol: 'DT1', + name, + symbol, serviceId: service.id }) - }) + } + return datatokens } protected async getEventData( provider: JsonRpcApiProvider, transactionHash: string, - abi: any + abi: any, + eventType: string ): Promise { const iface = new Interface(abi) const receipt = await provider.getTransactionReceipt(transactionHash) - const eventObj = { - topics: receipt.logs[0].topics as string[], - data: receipt.logs[0].data + + let eventHash: string + for (const [key, value] of Object.entries(EVENT_HASHES)) { + if (value.type === eventType) { + eventHash = key + break + } } + if (eventHash === '') { + INDEXER_LOGGER.error(`Event hash couldn't be found!`) + return null + } + + let eventObj: any + for (const log of receipt.logs) { + if (log.topics[0] === eventHash) { + eventObj = { + topics: log.topics, + data: log.data + } + break + } + } + + if (!eventObj) { + INDEXER_LOGGER.error( + `Event object couldn't be retrieved! Event hash not present in logs topics` + ) + return null + } + return iface.parseLog(eventObj) } @@ -98,7 +177,7 @@ class BaseEventProcessor { this.networkId, saveDDO.id, saveDDO.nftAddress, - saveDDO.event?.tx, + saveDDO.indexedMetadata?.event?.tx, true ) INDEXER_LOGGER.logMessage( @@ -111,7 +190,7 @@ class BaseEventProcessor { this.networkId, ddo.id, ddo.nftAddress, - ddo.event?.tx, + ddo.indexedMetadata?.event?.tx, true, err.message ) @@ -331,7 +410,8 @@ export class MetadataEventProcessor extends BaseEventProcessor { const decodedEventData = await this.getEventData( provider, event.transactionHash, - ERC721Template.abi + ERC721Template.abi, + eventName ) const metadata = decodedEventData.args[4] const metadataHash = decodedEventData.args[5] @@ -347,14 +427,17 @@ export class MetadataEventProcessor extends BaseEventProcessor { metadataHash, metadata ) - if (ddo.id !== makeDid(event.address, chainId.toString(10))) { + const clonedDdo = structuredClone(ddo) + INDEXER_LOGGER.logMessage(`clonedDdo: ${JSON.stringify(clonedDdo)}`) + const updatedDdo = deleteIndexedMetadataIfExists(clonedDdo) + if (updatedDdo.id !== makeDid(event.address, chainId.toString(10))) { INDEXER_LOGGER.error( `Decrypted DDO ID is not matching the generated hash for DID.` ) return } // for unencrypted DDOs - if (parseInt(flag) !== 2 && !this.checkDdoHash(ddo, metadataHash)) { + if (parseInt(flag) !== 2 && !this.checkDdoHash(updatedDdo, metadataHash)) { return } @@ -406,13 +489,7 @@ export class MetadataEventProcessor extends BaseEventProcessor { // stuff that we overwrite ddo.chainId = chainId ddo.nftAddress = event.address - ddo.datatokens = this.getTokenInfo(ddo.services) - ddo.nft = await this.getNFTInfo( - ddo.nftAddress, - signer, - owner, - parseInt(decodedEventData.args[6]) - ) + ddo.datatokens = await this.getTokenInfo(ddo.services, signer) INDEXER_LOGGER.logMessage( `Processed new DDO data ${ddo.id} with txHash ${event.transactionHash} from block ${event.blockNumber}`, @@ -471,26 +548,40 @@ export class MetadataEventProcessor extends BaseEventProcessor { return } } - const from = decodedEventData.args[0] + const from = decodedEventData.args[0].toString() + let ddoUpdatedWithPricing = {} // we need to store the event data (either metadata created or update and is updatable) - if ([EVENTS.METADATA_CREATED, EVENTS.METADATA_UPDATED].includes(eventName)) { - if (!ddo.event) { - ddo.event = {} + if ( + [EVENTS.METADATA_CREATED, EVENTS.METADATA_UPDATED].includes(eventName) && + this.isValidDtAddressFromServices(ddo.services) + ) { + const ddoWithPricing = await getPricingStatsForDddo(ddo, signer) + ddoWithPricing.indexedMetadata.nft = await this.getNFTInfo( + ddoWithPricing.nftAddress, + signer, + owner, + parseInt(decodedEventData.args[6]) + ) + if (!ddoWithPricing.indexedMetadata.event) { + ddoWithPricing.indexedMetadata.event = {} } - ddo.event.tx = event.transactionHash - ddo.event.from = from - ddo.event.contract = event.address + + ddoWithPricing.indexedMetadata.event.tx = event.transactionHash + ddoWithPricing.indexedMetadata.event.from = from + ddoWithPricing.indexedMetadata.event.contract = event.address if (event.blockNumber) { - ddo.event.block = event.blockNumber + ddoWithPricing.indexedMetadata.event.block = event.blockNumber // try get block & timestamp from block (only wait 2.5 secs maximum) const promiseFn = provider.getBlock(event.blockNumber) const result = await asyncCallWithTimeout(promiseFn, 2500) if (result.data !== null && !result.timeout) { - ddo.event.datetime = new Date(result.data.timestamp * 1000).toJSON() + ddoWithPricing.indexedMetadata.event.datetime = new Date( + result.data.timestamp * 1000 + ).toJSON() } } else { - ddo.event.block = -1 + ddoWithPricing.indexedMetadata.event.block = -1 } // policyServer check @@ -498,14 +589,14 @@ export class MetadataEventProcessor extends BaseEventProcessor { let policyStatus if (eventName === EVENTS.METADATA_UPDATED) policyStatus = await policyServer.checkUpdateDDO( - ddo, + ddoWithPricing, this.networkId, event.transactionHash, event ) else policyStatus = await policyServer.checknewDDO( - ddo, + ddoWithPricing, this.networkId, event.transactionHash, event @@ -521,14 +612,20 @@ export class MetadataEventProcessor extends BaseEventProcessor { ) return } + ddoUpdatedWithPricing = structuredClone(ddoWithPricing) } // always call, but only create instance once const purgatory = await Purgatory.getInstance() // if purgatory is disabled just return false - const updatedDDO = await this.updatePurgatoryStateDdo(ddo, from, purgatory) - if (updatedDDO.purgatory.state === false) { + const updatedDDO = await this.updatePurgatoryStateDdo( + ddoUpdatedWithPricing, + from, + purgatory + ) + if (updatedDDO.indexedMetadata.purgatory.state === false) { // TODO: insert in a different collection for purgatory DDOs - const saveDDO = this.createOrUpdateDDO(ddo, eventName) + const saveDDO = await this.createOrUpdateDDO(ddoUpdatedWithPricing, eventName) + INDEXER_LOGGER.logMessage(`saved DDO: ${JSON.stringify(saveDDO)}`) return saveDDO } } catch (error) { @@ -558,11 +655,11 @@ export class MetadataEventProcessor extends BaseEventProcessor { const state: boolean = (await purgatory.isBannedAsset(ddo.id)) || (await purgatory.isBannedAccount(owner)) - ddo.purgatory = { + ddo.indexedMetadata.purgatory = { state } } else { - ddo.purgatory = { + ddo.indexedMetadata.purgatory = { state: false } } @@ -571,14 +668,14 @@ export class MetadataEventProcessor extends BaseEventProcessor { isUpdateable(previousDdo: any, txHash: string, block: number): [boolean, string] { let errorMsg: string - const ddoTxId = previousDdo.event.tx + const ddoTxId = previousDdo.indexedMetadata.event.tx // do not update if we have the same txid if (txHash === ddoTxId) { errorMsg = `Previous DDO has the same tx id, no need to update: event-txid=${txHash} <> asset-event-txid=${ddoTxId}` INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_DEBUG, errorMsg, true) return [false, errorMsg] } - const ddoBlock = previousDdo.event.block + const ddoBlock = previousDdo.indexedMetadata.event.block // do not update if we have the same block if (block === ddoBlock) { errorMsg = `Asset was updated later (block: ${ddoBlock}) vs transaction block: ${block}` @@ -600,7 +697,8 @@ export class MetadataStateEventProcessor extends BaseEventProcessor { const decodedEventData = await this.getEventData( provider, event.transactionHash, - ERC721Template.abi + ERC721Template.abi, + EVENTS.METADATA_STATE ) const metadataState = parseInt(decodedEventData.args[1].toString()) INDEXER_LOGGER.logMessage(`Processed new metadata state ${metadataState} `, true) @@ -624,22 +722,27 @@ export class MetadataStateEventProcessor extends BaseEventProcessor { } INDEXER_LOGGER.logMessage(`Found did ${did} on network ${chainId}`) - if ('nft' in ddo && ddo.nft.state !== metadataState) { + if ( + 'nft' in ddo.indexedMetadata && + ddo.indexedMetadata.nft.state !== metadataState + ) { let shortVersion = null if ( - ddo.nft.state === MetadataStates.ACTIVE && + ddo.indexedMetadata.nft.state === MetadataStates.ACTIVE && [MetadataStates.REVOKED, MetadataStates.DEPRECATED].includes(metadataState) ) { INDEXER_LOGGER.logMessage( - `DDO became non-visible from ${ddo.nft.state} to ${metadataState}` + `DDO became non-visible from ${ddo.indexedMetadata.nft.state} to ${metadataState}` ) shortVersion = { id: ddo.id, chainId, nftAddress: ddo.nftAddress, - nft: { - state: metadataState + indexedMetadata: { + nft: { + state: metadataState + } } } } @@ -647,7 +750,7 @@ export class MetadataStateEventProcessor extends BaseEventProcessor { // We should keep it here, because in further development we'll store // the previous structure of the non-visible DDOs (full version) // in case their state changes back to active. - ddo.nft.state = metadataState + ddo.indexedMetadata.nft.state = metadataState if (shortVersion) { ddo = shortVersion } @@ -655,14 +758,14 @@ export class MetadataStateEventProcessor extends BaseEventProcessor { // Still update until we validate and polish schemas for DDO. // But it should update ONLY if the first condition is met. // Check https://github.com/oceanprotocol/aquarius/blob/84a560ea972485e46dd3c2cfc3cdb298b65d18fa/aquarius/events/processors.py#L663 - ddo.nft = { + ddo.indexedMetadata.nft = { state: metadataState } } INDEXER_LOGGER.logMessage( `Found did ${did} for state updating on network ${chainId}` ) - const savedDDO = this.createOrUpdateDDO(ddo, EVENTS.METADATA_STATE) + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.METADATA_STATE) return savedDDO } catch (err) { INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) @@ -680,7 +783,8 @@ export class OrderStartedEventProcessor extends BaseEventProcessor { const decodedEventData = await this.getEventData( provider, event.transactionHash, - ERC20Template.abi + ERC20Template.abi, + EVENTS.ORDER_STARTED ) const serviceIndex = parseInt(decodedEventData.args[3].toString()) const timestamp = parseInt(decodedEventData.args[4].toString()) @@ -707,18 +811,32 @@ export class OrderStartedEventProcessor extends BaseEventProcessor { ) return } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } if ( - 'stats' in ddo && + ddo.indexedMetadata.stats.length !== 0 && ddo.services[serviceIndex].datatokenAddress?.toLowerCase() === event.address?.toLowerCase() ) { - ddo.stats.orders += 1 - } else { - // Still update until we validate and polish schemas for DDO. - // But it should update ONLY if first condition is met. - ddo.stats = { - orders: 1 + for (const stat of ddo.indexedMetadata.stats) { + if (stat.datatokenAddress.toLowerCase() === event.address?.toLowerCase()) { + stat.orders += 1 + break + } } + } else if (ddo.indexedMetadata.stats.length === 0) { + ddo.indexedMetadata.stats.push({ + datatokenAddress: event.address, + name: await datatokenContract.name(), + serviceId: ddo.services[serviceIndex].id, + orders: 1, + prices: await getPricesByDt(datatokenContract, signer) + }) } await orderDatabase.create( event.transactionHash, @@ -733,7 +851,7 @@ export class OrderStartedEventProcessor extends BaseEventProcessor { INDEXER_LOGGER.logMessage( `Found did ${did} for order starting on network ${chainId}` ) - const savedDDO = this.createOrUpdateDDO(ddo, EVENTS.ORDER_STARTED) + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.ORDER_STARTED) return savedDDO } catch (err) { INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) @@ -751,7 +869,8 @@ export class OrderReusedEventProcessor extends BaseEventProcessor { const decodedEventData = await this.getEventData( provider, event.transactionHash, - ERC20Template.abi + ERC20Template.abi, + EVENTS.ORDER_REUSED ) const startOrderId = decodedEventData.args[0].toString() const timestamp = parseInt(decodedEventData.args[2].toString()) @@ -775,7 +894,37 @@ export class OrderReusedEventProcessor extends BaseEventProcessor { ) return } - ddo.stats.orders += 1 + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if (stat.datatokenAddress.toLowerCase() === event.address?.toLowerCase()) { + stat.orders += 1 + break + } + } + } else { + INDEXER_LOGGER.logMessage(`[OrderReused] - No stats were found on the ddo`) + const serviceIdToFind = findServiceIdByDatatoken(ddo, event.address) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[OrderReused] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress: event.address, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 1, + prices: await getPricesByDt(datatokenContract, signer) + }) + } try { const startOrder = await orderDatabase.retrieve(startOrderId) @@ -791,7 +940,7 @@ export class OrderReusedEventProcessor extends BaseEventProcessor { timestamp, startOrder.consumer, payer, - ddo.services[0].datatokenAddress, + event.address, nftAddress, did, startOrderId @@ -803,7 +952,604 @@ export class OrderReusedEventProcessor extends BaseEventProcessor { true ) } - const savedDDO = this.createOrUpdateDDO(ddo, EVENTS.ORDER_REUSED) + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.ORDER_REUSED) + return savedDDO + } catch (err) { + INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + } + } +} + +export class DispenserCreatedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: JsonRpcApiProvider + ): Promise { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + Dispenser.abi, + EVENTS.DISPENSER_CREATED + ) + const datatokenAddress = decodedEventData.args[0].toString() + const datatokenContract = getDtContract(signer, datatokenAddress) + + const nftAddress = await datatokenContract.getERC721Address() + const did = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + try { + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected DispenserCreated changed for ${did}, but it does not exists.` + ) + return + } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + !doesDispenserAlreadyExist(event.address, stat.prices)[0] + ) { + const price = { + type: 'dispenser', + price: '0', + contract: event.address, + token: datatokenAddress + } + stat.prices.push(price) + break + } else if (doesDispenserAlreadyExist(event.address, stat.prices)[0]) { + break + } + } + } else { + INDEXER_LOGGER.logMessage(`[DispenserCreated] - No stats were found on the ddo`) + const serviceIdToFind = findServiceIdByDatatoken(ddo, datatokenAddress) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[DispenserCreated] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 0, + prices: await getPricesByDt(datatokenContract, signer) + }) + } + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.DISPENSER_CREATED) + return savedDDO + } catch (err) { + INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + } + } +} + +export class DispenserActivatedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: JsonRpcApiProvider + ): Promise { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + Dispenser.abi, + EVENTS.DISPENSER_ACTIVATED + ) + const datatokenAddress = decodedEventData.args[0].toString() + const datatokenContract = getDtContract(signer, datatokenAddress) + + const nftAddress = await datatokenContract.getERC721Address() + const did = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + try { + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected DispenserActivated changed for ${did}, but it does not exists.` + ) + return + } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + !doesDispenserAlreadyExist(event.address, stat.prices)[0] + ) { + stat.prices.push({ + type: 'dispenser', + price: '0', + contract: event.address, + token: datatokenAddress + }) + break + } else if (doesDispenserAlreadyExist(event.address, stat.prices)[0]) { + break + } + } + } else { + INDEXER_LOGGER.logMessage(`[DispenserActivated] - No stats were found on the ddo`) + const serviceIdToFind = findServiceIdByDatatoken(ddo, datatokenAddress) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[DispenserActivated] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 0, + prices: await getPricesByDt(datatokenContract, signer) + }) + } + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.DISPENSER_ACTIVATED) + return savedDDO + } catch (err) { + INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + } + } +} + +export class DispenserDeactivatedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: JsonRpcApiProvider + ): Promise { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + Dispenser.abi, + EVENTS.DISPENSER_DEACTIVATED + ) + const datatokenAddress = decodedEventData.args[0].toString() + const datatokenContract = getDtContract(signer, datatokenAddress) + + const nftAddress = await datatokenContract.getERC721Address() + const did = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + try { + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected DispenserDeactivated changed for ${did}, but it does not exists.` + ) + return + } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + doesDispenserAlreadyExist(event.address, stat.prices)[0] + ) { + const price = doesDispenserAlreadyExist(event.address, stat.prices)[1] + const index = stat.prices.indexOf(price) + stat.prices.splice(index, 1) + break + } else if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + !doesDispenserAlreadyExist(event.address, stat.prices)[0] + ) { + INDEXER_LOGGER.logMessage( + `Detected DispenserDeactivated changed for ${event.address}, but dispenser does not exist in the DDO pricing.` + ) + break + } + } + } else { + INDEXER_LOGGER.logMessage( + `[DispenserDeactivated] - No stats were found on the ddo` + ) + const serviceIdToFind = findServiceIdByDatatoken(ddo, datatokenAddress) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[DispenserDeactivated] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 0, + prices: await getPricesByDt(datatokenContract, signer) + }) + } + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.DISPENSER_DEACTIVATED) + return savedDDO + } catch (err) { + INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + } + } +} + +export class ExchangeCreatedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: JsonRpcApiProvider + ): Promise { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + FixedRateExchange.abi, + EVENTS.EXCHANGE_CREATED + ) + const exchangeId = decodedEventData.args[0].toString() + const freContract = new ethers.Contract(event.address, FixedRateExchange.abi, signer) + const exchange = await freContract.getExchange(exchangeId) + const datatokenAddress = exchange[1] + const datatokenContract = getDtContract(signer, datatokenAddress) + const nftAddress = await datatokenContract.getERC721Address() + const did = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + try { + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected ExchangeCreated changed for ${did}, but it does not exists.` + ) + return + } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + !doesFreAlreadyExist(exchangeId, stat.prices)[0] + ) { + stat.prices.push({ + type: 'fixedrate', + price: ethers.formatEther(exchange[5]), + contract: event.address, + token: exchange[3], + exchangeId + }) + break + } else if (doesFreAlreadyExist(event.address, stat.prices)[0]) { + break + } + } + } else { + INDEXER_LOGGER.logMessage(`[ExchangeCreated] - No stats were found on the ddo`) + const serviceIdToFind = findServiceIdByDatatoken(ddo, datatokenAddress) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[ExchangeCreated] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 0, + prices: await getPricesByDt(datatokenContract, signer) + }) + } + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.EXCHANGE_ACTIVATED) + return savedDDO + } catch (err) { + INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + } + } +} + +export class ExchangeActivatedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: JsonRpcApiProvider + ): Promise { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + FixedRateExchange.abi, + EVENTS.EXCHANGE_ACTIVATED + ) + INDEXER_LOGGER.logMessage(`event: ${JSON.stringify(event)}`) + INDEXER_LOGGER.logMessage( + `decodedEventData in exchange activated: ${JSON.stringify(decodedEventData)}` + ) + const exchangeId = decodedEventData.args[0].toString() + const freContract = new ethers.Contract(event.address, FixedRateExchange.abi, signer) + const exchange = await freContract.getExchange(exchangeId) + const datatokenAddress = exchange[1] + const datatokenContract = getDtContract(signer, datatokenAddress) + const nftAddress = await datatokenContract.getERC721Address() + const did = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + try { + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected ExchangeActivated changed for ${did}, but it does not exists.` + ) + return + } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + !doesFreAlreadyExist(exchangeId, stat.prices)[0] + ) { + stat.prices.push({ + type: 'fixedrate', + price: ethers.formatEther(exchange[5]), + contract: event.address, + token: exchange[3], + exchangeId + }) + break + } else if (doesFreAlreadyExist(event.address, stat.prices)[0]) { + break + } + } + } else { + INDEXER_LOGGER.logMessage(`[ExchangeActivated] - No stats were found on the ddo`) + const serviceIdToFind = findServiceIdByDatatoken(ddo, datatokenAddress) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[ExchangeActivated] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 0, + prices: await getPricesByDt(datatokenContract, signer) + }) + } + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.EXCHANGE_ACTIVATED) + return savedDDO + } catch (err) { + INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + } + } +} + +export class ExchangeDeactivatedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: JsonRpcApiProvider + ): Promise { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + FixedRateExchange.abi, + EVENTS.EXCHANGE_DEACTIVATED + ) + const exchangeId = decodedEventData.args[0].toString() + const freContract = new ethers.Contract(event.address, FixedRateExchange.abi, signer) + const exchange = await freContract.getExchange(exchangeId) + const datatokenAddress = exchange[1] + const datatokenContract = getDtContract(signer, datatokenAddress) + const nftAddress = await datatokenContract.getERC721Address() + const did = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + try { + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected ExchangeDeactivated changed for ${did}, but it does not exists.` + ) + return + } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + doesFreAlreadyExist(exchangeId, stat.prices)[0] + ) { + const price = doesFreAlreadyExist(exchangeId, stat.prices)[1] + const index = stat.prices.indexOf(price) + stat.prices.splice(index, 1) + break + } else if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + !doesFreAlreadyExist(exchangeId, stat.prices)[0] + ) { + INDEXER_LOGGER.logMessage( + `Detected ExchangeDeactivated changed for ${event.address}, but exchange ${exchangeId} does not exist in the DDO pricing.` + ) + break + } + } + } else { + INDEXER_LOGGER.logMessage( + `[ExchangeDeactivated] - No stats were found on the ddo` + ) + const serviceIdToFind = findServiceIdByDatatoken(ddo, datatokenAddress) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[ExchangeDeactivated] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 0, + prices: await getPricesByDt(datatokenContract, signer) + }) + } + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.EXCHANGE_DEACTIVATED) + return savedDDO + } catch (err) { + INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) + } + } +} +export class ExchangeRateChangedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: JsonRpcApiProvider + ): Promise { + const decodedEventData = await this.getEventData( + provider, + event.transactionHash, + FixedRateExchange.abi, + EVENTS.EXCHANGE_RATE_CHANGED + ) + const exchangeId = ethers.toUtf8Bytes(decodedEventData.args[0].toString()) + const newRate = decodedEventData.args[2].toString() + const freContract = new ethers.Contract(event.address, FixedRateExchange.abi, signer) + const exchange = await freContract.getExchange(exchangeId) + const datatokenAddress = exchange[1] + const datatokenContract = getDtContract(signer, datatokenAddress) + const nftAddress = await datatokenContract.getERC721Address() + const did = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + try { + const { ddo: ddoDatabase } = await getDatabase() + const ddo = await ddoDatabase.retrieve(did) + if (!ddo) { + INDEXER_LOGGER.logMessage( + `Detected ExchangeRateChanged changed for ${did}, but it does not exists.` + ) + return + } + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + if (ddo.indexedMetadata.stats.length !== 0) { + for (const stat of ddo.indexedMetadata.stats) { + if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + doesFreAlreadyExist(exchangeId, stat.prices)[0] + ) { + const price = doesFreAlreadyExist(exchangeId, stat.prices)[1] + price.price = newRate + break + } else if ( + stat.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase() && + !doesFreAlreadyExist(exchangeId, stat.prices)[0] + ) { + INDEXER_LOGGER.logMessage( + `[ExchangeRateChanged] - Could not find the exchange in DDO ${did} prices` + ) + return + } + } + } else { + INDEXER_LOGGER.logMessage( + `[ExchangeRateChanged] - No stats were found on the ddo` + ) + const serviceIdToFind = findServiceIdByDatatoken(ddo, datatokenAddress) + if (!serviceIdToFind) { + INDEXER_LOGGER.logMessage( + `[ExchangeRateChanged] - This datatoken does not contain this service. Invalid service id!` + ) + return + } + ddo.indexedMetadata.stats.push({ + datatokenAddress, + name: await datatokenContract.name(), + serviceId: serviceIdToFind, + orders: 0, + prices: getPricesByDt(datatokenContract, signer) + }) + } + + const savedDDO = await this.createOrUpdateDDO(ddo, EVENTS.EXCHANGE_RATE_CHANGED) return savedDDO } catch (err) { INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true) diff --git a/src/components/Indexer/utils.ts b/src/components/Indexer/utils.ts index 66ab708a6..fb250cf9a 100644 --- a/src/components/Indexer/utils.ts +++ b/src/components/Indexer/utils.ts @@ -11,10 +11,17 @@ import { } from '../../utils/index.js' import { BlocksEvents, NetworkEvent, ProcessingEvents } from '../../@types/blockchain.js' import { + DispenserActivatedEventProcessor, + DispenserDeactivatedEventProcessor, MetadataEventProcessor, MetadataStateEventProcessor, OrderReusedEventProcessor, - OrderStartedEventProcessor + OrderStartedEventProcessor, + ExchangeActivatedEventProcessor, + ExchangeDeactivatedEventProcessor, + ExchangeRateChangedEventProcessor, + ExchangeCreatedEventProcessor, + DispenserCreatedEventProcessor } from './processor.js' import { INDEXER_LOGGER } from '../../utils/logging/common.js' import { fetchEventFromTransaction } from '../../utils/util.js' @@ -23,11 +30,30 @@ import { LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import { getOceanArtifactsAdressesByChainId } from '../../utils/address.js' import { CommandStatus, JobStatus } from '../../@types/commands.js' import { create256Hash } from '../../utils/crypt.js' +import Dispenser from '@oceanprotocol/contracts/artifacts/contracts/pools/dispenser/Dispenser.sol/Dispenser.json' assert { type: 'json' } +import FixedRateExchange from '@oceanprotocol/contracts/artifacts/contracts/pools/fixedRate/FixedRateExchange.sol/FixedRateExchange.json' assert { type: 'json' } +import { ServicePrice } from '../../@types/DDO/IndexedMetadata.js' let metadataEventProccessor: MetadataEventProcessor let metadataStateEventProcessor: MetadataStateEventProcessor let orderReusedEventProcessor: OrderReusedEventProcessor let orderStartedEventProcessor: OrderStartedEventProcessor +let dispenserActivatedEventProcessor: DispenserActivatedEventProcessor +let dispenserDeactivatedEventProcessor: DispenserDeactivatedEventProcessor +let exchangeCreatedEventProcessor: ExchangeCreatedEventProcessor +let exchangeActivatedEventProcessor: ExchangeActivatedEventProcessor +let exchangeDeactivatedEventProcessor: ExchangeDeactivatedEventProcessor +let exchangeNewRateEventProcessor: ExchangeRateChangedEventProcessor +let dispenserCreatedEventProcessor: DispenserCreatedEventProcessor + +function getExchangeCreatedEventProcessor( + chainId: number +): ExchangeCreatedEventProcessor { + if (!exchangeCreatedEventProcessor) { + exchangeCreatedEventProcessor = new ExchangeCreatedEventProcessor(chainId) + } + return exchangeCreatedEventProcessor +} function getMetadataEventProcessor(chainId: number): MetadataEventProcessor { if (!metadataEventProccessor) { @@ -57,6 +83,58 @@ function getOrderStartedEventProcessor(chainId: number): OrderStartedEventProces return orderStartedEventProcessor } +function getDispenserCreatedEventProcessor( + chainId: number +): DispenserCreatedEventProcessor { + if (!dispenserCreatedEventProcessor) { + dispenserCreatedEventProcessor = new DispenserCreatedEventProcessor(chainId) + } + return dispenserCreatedEventProcessor +} + +function getDispenserActivatedEventProcessor( + chainId: number +): DispenserActivatedEventProcessor { + if (!dispenserActivatedEventProcessor) { + dispenserActivatedEventProcessor = new DispenserActivatedEventProcessor(chainId) + } + return dispenserActivatedEventProcessor +} + +function getDispenserDeactivatedEventProcessor( + chainId: number +): DispenserDeactivatedEventProcessor { + if (!dispenserDeactivatedEventProcessor) { + dispenserDeactivatedEventProcessor = new DispenserDeactivatedEventProcessor(chainId) + } + return dispenserDeactivatedEventProcessor +} + +function getExchangeActivatedEventProcessor( + chainId: number +): ExchangeActivatedEventProcessor { + if (!exchangeActivatedEventProcessor) { + exchangeActivatedEventProcessor = new ExchangeActivatedEventProcessor(chainId) + } + return exchangeActivatedEventProcessor +} + +function getExchangeDeactivatedEventProcessor( + chainId: number +): ExchangeDeactivatedEventProcessor { + if (!exchangeDeactivatedEventProcessor) { + exchangeDeactivatedEventProcessor = new ExchangeDeactivatedEventProcessor(chainId) + } + return exchangeDeactivatedEventProcessor +} + +function getExchangeNewRateEventProcessor(chainId: number) { + if (!exchangeNewRateEventProcessor) { + exchangeNewRateEventProcessor = new ExchangeRateChangedEventProcessor(chainId) + } + return exchangeNewRateEventProcessor +} + export const getContractAddress = (chainId: number, contractName: string): string => { const addressFile = getOceanArtifactsAdressesByChainId(chainId) if (addressFile && contractName in addressFile) { @@ -250,9 +328,22 @@ export const processChunkLogs = async ( const processor = getMetadataStateEventProcessor(chainId) storeEvents[event.type] = await processor.processEvent(log, chainId, provider) } else if (event.type === EVENTS.EXCHANGE_CREATED) { - storeEvents[event.type] = procesExchangeCreated() + const processor = getExchangeCreatedEventProcessor(chainId) + INDEXER_LOGGER.logMessage(`log for exchange created: ${JSON.stringify(log)}`) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) } else if (event.type === EVENTS.EXCHANGE_RATE_CHANGED) { - storeEvents[event.type] = processExchangeRateChanged() + const processor = getExchangeNewRateEventProcessor(chainId) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) } else if (event.type === EVENTS.ORDER_STARTED) { const processor = getOrderStartedEventProcessor(chainId) storeEvents[event.type] = await processor.processEvent( @@ -271,6 +362,46 @@ export const processChunkLogs = async ( ) } else if (event.type === EVENTS.TOKEN_URI_UPDATE) { storeEvents[event.type] = processTokenUriUpadate() + } else if (event.type === EVENTS.DISPENSER_ACTIVATED) { + const processor = getDispenserActivatedEventProcessor(chainId) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) + } else if (event.type === EVENTS.DISPENSER_CREATED) { + const processor = getDispenserCreatedEventProcessor(chainId) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) + } else if (event.type === EVENTS.DISPENSER_DEACTIVATED) { + const processor = getDispenserDeactivatedEventProcessor(chainId) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) + } else if (event.type === EVENTS.EXCHANGE_ACTIVATED) { + const processor = getExchangeActivatedEventProcessor(chainId) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) + } else if (event.type === EVENTS.EXCHANGE_DEACTIVATED) { + const processor = getExchangeDeactivatedEventProcessor(chainId) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) } } } // end for loop @@ -280,14 +411,6 @@ export const processChunkLogs = async ( return {} } -const procesExchangeCreated = (): string => { - return 'EXCHANGE_CREATED' -} - -const processExchangeRateChanged = (): string => { - return 'EXCHANGE_RATE_CHANGED' -} - const processTokenUriUpadate = (): string => { return 'TOKEN_URI_UPDATE' } @@ -375,3 +498,217 @@ export function buildJobIdentifier(command: string, extra: string[]): JobStatus hash: create256Hash(extra.join('')) } } + +export function findServiceIdByDatatoken(ddo: any, datatokenAddress: string): string { + for (const s of ddo.services) { + if (s.datatokenAddress.toLowerCase() === datatokenAddress.toLowerCase()) { + return s.id + } + } + return null +} + +export function doesDispenserAlreadyExist( + dispenserAddress: string, + prices: ServicePrice[] +): [boolean, ServicePrice?] { + for (const price of prices) { + if (dispenserAddress.toLowerCase() === price.contract.toLowerCase()) { + return [true, price] + } + } + return [false, null] +} + +export function doesFreAlreadyExist( + exchangeId: ethers.BytesLike, + prices: ServicePrice[] +): [boolean, ServicePrice?] { + for (const price of prices) { + if (exchangeId === price.exchangeId) { + return [true, price] + } + } + return [false, null] +} + +export async function getPricesByDt( + datatoken: ethers.Contract, + signer: Signer +): Promise { + let dispensers = [] + let fixedRates = [] + let prices: ServicePrice[] = [] + try { + dispensers = await datatoken.getDispensers() + } catch (e) { + INDEXER_LOGGER.error(`[GET PRICES] failure when retrieving dispensers: ${e}`) + } + try { + fixedRates = await datatoken.getFixedRates() + } catch (e) { + INDEXER_LOGGER.error( + `[GET PRICES] failure when retrieving fixed rate exchanges: ${e}` + ) + } + if (dispensers.length === 0 && fixedRates.length === 0) { + prices = [] + } else { + if (dispensers) { + for (const dispenser of dispensers) { + const dispenserContract = new ethers.Contract(dispenser, Dispenser.abi, signer) + try { + const [isActive, ,] = await dispenserContract.status( + await datatoken.getAddress() + ) + if (isActive === true) { + prices.push({ + type: 'dispenser', + price: '0', + contract: dispenser, + token: await datatoken.getAddress() + }) + } + } catch (e) { + INDEXER_LOGGER.error( + `[GET PRICES] failure when retrieving dispenser status from contracts: ${e}` + ) + } + } + } + + if (fixedRates) { + for (const fixedRate of fixedRates) { + const fixedRateContract = new ethers.Contract( + fixedRate[0], + FixedRateExchange.abi, + signer + ) + try { + const [, , , baseTokenAddress, , pricing, isActive, , , , , ,] = + await fixedRateContract.getExchange(fixedRate[1]) + if (isActive === true) { + prices.push({ + type: 'fixedrate', + price: ethers.formatEther(pricing), + token: baseTokenAddress, + contract: fixedRate[0], + exchangeId: fixedRate[1] + }) + } + } catch (e) { + INDEXER_LOGGER.error( + `[GET PRICES] failure when retrieving exchange status from contracts: ${e}` + ) + } + } + } + } + return prices +} + +export async function getPricingStatsForDddo(ddo: any, signer: Signer): Promise { + if (!ddo.indexedMetadata) { + ddo.indexedMetadata = {} + } + + if (!Array.isArray(ddo.indexedMetadata.stats)) { + ddo.indexedMetadata.stats = [] + } + for (const service of ddo.services) { + const datatoken = new ethers.Contract( + service.datatokenAddress, + ERC20Template.abi, + signer + ) + let dispensers = [] + let fixedRates = [] + const prices: ServicePrice[] = [] + try { + dispensers = await datatoken.getDispensers() + } catch (e) { + INDEXER_LOGGER.error(`Contract call fails when retrieving dispensers: ${e}`) + } + try { + fixedRates = await datatoken.getFixedRates() + } catch (e) { + INDEXER_LOGGER.error( + `Contract call fails when retrieving fixed rate exchanges: ${e}` + ) + } + if (dispensers.length === 0 && fixedRates.length === 0) { + ddo.indexedMetadata.stats.push({ + datatokenAddress: service.datatokenAddress, + name: await datatoken.name(), + serviceId: service.id, + orders: 0, + prices: [] + }) + } else { + if (dispensers) { + for (const dispenser of dispensers) { + const dispenserContract = new ethers.Contract(dispenser, Dispenser.abi, signer) + try { + const [isActive, ,] = await dispenserContract.status( + await datatoken.getAddress() + ) + if (isActive === true) { + prices.push({ + type: 'dispenser', + price: '0', + contract: dispenser, + token: service.datatokenAddress + }) + ddo.indexedMetadata.stats.push({ + datatokenAddress: service.datatokenAddress, + name: await datatoken.name(), + serviceId: service.id, + orders: 0, + prices + }) + } + } catch (e) { + INDEXER_LOGGER.error( + `[GET PRICES] failure when retrieving dispenser status from contracts: ${e}` + ) + } + } + } + } + + if (fixedRates) { + for (const fixedRate of fixedRates) { + const fixedRateContract = new ethers.Contract( + fixedRate[0], + FixedRateExchange.abi, + signer + ) + try { + const [, , , baseTokenAddress, , pricing, isActive, , , , , ,] = + await fixedRateContract.getExchange(fixedRate[1]) + if (isActive === true) { + prices.push({ + type: 'fixedrate', + price: ethers.formatEther(pricing), + token: baseTokenAddress, + contract: fixedRate[0], + exchangeId: fixedRate[1] + }) + ddo.indexedMetadata.stats.push({ + datatokenAddress: service.datatokenAddress, + name: await datatoken.name(), + serviceId: service.id, + orders: 0, // just created + prices + }) + } + } catch (e) { + INDEXER_LOGGER.error( + `[GET PRICES] failure when retrieving exchange status from contracts: ${e}` + ) + } + } + } + } + return ddo +} diff --git a/src/components/core/handler/ddoHandler.ts b/src/components/core/handler/ddoHandler.ts index 8f4b5e7f0..819d13a81 100644 --- a/src/components/core/handler/ddoHandler.ts +++ b/src/components/core/handler/ddoHandler.ts @@ -42,7 +42,7 @@ import { getNetworkHeight, wasNFTDeployedByOurFactory } from '../../Indexer/utils.js' -import { validateDDOHash } from '../../../utils/asset.js' +import { deleteIndexedMetadataIfExists, validateDDOHash } from '../../../utils/asset.js' const MAX_NUM_PROVIDERS = 5 // after 60 seconds it returns whatever info we have available @@ -336,7 +336,9 @@ export class DecryptDdoHandler extends Handler { // did matches const ddo = JSON.parse(decryptedDocument.toString()) - if (ddo.id !== makeDid(dataNftAddress, chainId)) { + const clonedDdo = structuredClone(ddo) + const updatedDdo = deleteIndexedMetadataIfExists(clonedDdo) + if (updatedDdo.id !== makeDid(dataNftAddress, chainId)) { CORE_LOGGER.error(`Decrypted DDO ID is not matching the generated hash for DID.`) return { stream: null, @@ -538,7 +540,7 @@ export class FindDdoHandler extends Handler { if (isResponseLegit) { const ddoInfo: FindDDOResponse = { id: ddo.id, - lastUpdateTx: ddo.event.tx, + lastUpdateTx: ddo.indexedMetadata.event.tx, lastUpdateTime: ddo.metadata.updated, provider: peer } @@ -752,7 +754,11 @@ export class FindDdoHandler extends Handler { metadata: ddoData.metadata, services: formattedServices, credentials: ddoData.credentials, - event: ddoData.event + indexedMetadata: { + stats: ddoData.indexedMetadata.stats, + event: ddoData.indexedMetadata.event, + nft: ddoData.indexedMetadata.nft + } } return ddo @@ -843,11 +849,14 @@ export function validateDDOIdentifier(identifier: string): ValidateParams { * @returns validation result */ async function checkIfDDOResponseIsLegit(ddo: any): Promise { - const { nftAddress, chainId, event } = ddo - let isValid = validateDDOHash(ddo.id, nftAddress, chainId) + const clonedDdo = structuredClone(ddo) + const { indexedMetadata } = clonedDdo + const updatedDdo = deleteIndexedMetadataIfExists(ddo) + const { nftAddress, chainId } = updatedDdo + let isValid = validateDDOHash(updatedDdo.id, nftAddress, chainId) // 1) check hash sha256(nftAddress + chainId) if (!isValid) { - CORE_LOGGER.error(`Asset ${ddo.id} does not have a valid hash`) + CORE_LOGGER.error(`Asset ${updatedDdo.id} does not have a valid hash`) return false } @@ -881,19 +890,25 @@ async function checkIfDDOResponseIsLegit(ddo: any): Promise { ) if (!wasDeployedByUs) { - CORE_LOGGER.error(`Asset ${ddo.id} not deployed by the data NFT factory`) + CORE_LOGGER.error(`Asset ${updatedDdo.id} not deployed by the data NFT factory`) return false } // 5) check block & events const networkBlock = await getNetworkHeight(blockchain.getProvider()) - if (!event.block || event.block < 0 || networkBlock < event.block) { - CORE_LOGGER.error(`Event block: ${event.block} is either missing or invalid`) + if ( + !indexedMetadata.event.block || + indexedMetadata.event.block < 0 || + networkBlock < indexedMetadata.event.block + ) { + CORE_LOGGER.error( + `Event block: ${indexedMetadata.event.block} is either missing or invalid` + ) return false } // check events on logs - const txId: string = event.tx // NOTE: DDO is txid, Asset is tx + const txId: string = indexedMetadata.event.tx // NOTE: DDO is txid, Asset is tx if (!txId) { CORE_LOGGER.error(`DDO event missing tx data, cannot confirm transaction`) return false diff --git a/src/components/core/handler/downloadHandler.ts b/src/components/core/handler/downloadHandler.ts index 1f06d6e65..218c6951c 100644 --- a/src/components/core/handler/downloadHandler.ts +++ b/src/components/core/handler/downloadHandler.ts @@ -51,8 +51,8 @@ export function isOrderingAllowedForAsset(asset: DDO): OrdableAssetResponse { reason: `Asset provided is either null, either undefined ${asset}` } } else if ( - asset.nft && - !(asset.nft.state in [MetadataStates.ACTIVE, MetadataStates.UNLISTED]) + asset.indexedMetadata.nft && + !(asset.indexedMetadata.nft.state in [MetadataStates.ACTIVE, MetadataStates.UNLISTED]) ) { return { isOrdable: false, diff --git a/src/components/core/utils/validateDdoHandler.ts b/src/components/core/utils/validateDdoHandler.ts index ca28a4ee3..a16e30699 100644 --- a/src/components/core/utils/validateDdoHandler.ts +++ b/src/components/core/utils/validateDdoHandler.ts @@ -11,6 +11,7 @@ import { CORE_LOGGER } from '../../../utils/logging/common.js' import { create256Hash } from '../../../utils/crypt.js' import { getProviderWallet } from './feesHandler.js' import { Readable } from 'stream' +import { deleteIndexedMetadataIfExists } from '../../../utils/asset.js' const CURRENT_VERSION = '4.7.0' const ALLOWED_VERSIONS = ['4.1.0', '4.3.0', '4.5.0', '4.7.0'] @@ -54,7 +55,8 @@ export async function validateObject( chainId: number, nftAddress: string ): Promise<[boolean, Record]> { - const ddoCopy = JSON.parse(JSON.stringify(obj)) + const updatedDdo = deleteIndexedMetadataIfExists(obj) + const ddoCopy = JSON.parse(JSON.stringify(updatedDdo)) ddoCopy['@type'] = 'DDO' const extraErrors: Record = {} diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index cf332b951..46af3a273 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -471,7 +471,7 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase { getDDOSchema(ddo: Record) { let schemaName: string | undefined - if (ddo.nft?.state !== 0) { + if (ddo.indexedMetadata?.nft?.state !== 0) { schemaName = 'op_ddo_short' } else if (ddo.version) { schemaName = `op_ddo_v${ddo.version}` @@ -487,7 +487,12 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase { } async validateDDO(ddo: Record): Promise { - if (ddo.nft?.state !== 0) { + if ('indexedMetadata' in ddo && ddo.indexedMetadata.nft?.state !== 0) { + // Skipping validation for short DDOs as it currently doesn't work + // TODO: DDO validation needs to be updated to consider the fields required by the schema + // See github issue: https://github.com/oceanprotocol/ocean-node/issues/256 + return true + } else if ('nft' in ddo && ddo.nft?.state !== 0) { return true } else { const validation = await validateObject(ddo, ddo.chainId, ddo.nftAddress) diff --git a/src/components/database/TypenseDatabase.ts b/src/components/database/TypenseDatabase.ts index 004826866..1641f8605 100644 --- a/src/components/database/TypenseDatabase.ts +++ b/src/components/database/TypenseDatabase.ts @@ -371,7 +371,7 @@ export class TypesenseDdoDatabase extends AbstractDdoDatabase { getDDOSchema(ddo: Record): TypesenseSchema { // Find the schema based on the DDO version OR use the short DDO schema when state !== 0 let schemaName: string - if (ddo.nft?.state !== 0) { + if (ddo.indexedMetadata?.nft?.state !== 0) { schemaName = 'op_ddo_short' } else if (ddo.version) { schemaName = `op_ddo_v${ddo.version}` @@ -387,11 +387,13 @@ export class TypesenseDdoDatabase extends AbstractDdoDatabase { } async validateDDO(ddo: Record): Promise { - if (ddo.nft?.state !== 0) { + if ('indexedMetadata' in ddo && ddo.indexedMetadata.nft?.state !== 0) { // Skipping validation for short DDOs as it currently doesn't work // TODO: DDO validation needs to be updated to consider the fields required by the schema // See github issue: https://github.com/oceanprotocol/ocean-node/issues/256 return true + } else if ('nft' in ddo && ddo.nft?.state !== 0) { + return true } else { const validation = await validateObject(ddo, ddo.chainId, ddo.nftAddress) if (validation[0] === true) { diff --git a/src/test/data/assets.ts b/src/test/data/assets.ts index 476b78850..3fe408972 100644 --- a/src/test/data/assets.ts +++ b/src/test/data/assets.ts @@ -47,17 +47,16 @@ export const downloadAsset = { owner: '', created: '' }, - purgatory: { - state: false - }, - datatokens: [] as any, stats: { - allocated: 0, orders: 0, price: { value: '0' } - } + }, + purgatory: { + state: false + }, + datatokens: [] as any } const nftLevelCredentials: Credentials = { diff --git a/src/test/data/ddo.ts b/src/test/data/ddo.ts index 0fa6439d1..4044803b0 100644 --- a/src/test/data/ddo.ts +++ b/src/test/data/ddo.ts @@ -1,24 +1,35 @@ export const ddo = { hashType: 'sha256', '@context': ['https://w3id.org/did/v1'], - id: 'did:op:fa0e8fa9550e8eb13392d6eeb9ba9f8111801b332c8d2345b350b3bc66b379d7', - nftAddress: '0xBB1081DbF3227bbB233Db68f7117114baBb43656', + id: 'did:op:b5ef03b7f0d148cde2942c8a330625d4fc71dd32b67a0093da24fcb9a4439887', + nftAddress: '0xca63894B1c911515F1C034BE3509AfC008B42d83', version: '4.1.0', chainId: 137, metadata: { created: '2022-12-30T08:40:06Z', updated: '2022-12-30T08:40:06Z', type: 'dataset', - name: 'DEX volume in details', + name: 'ETH/USDT orderbook', description: - 'Volume traded and locked of Decentralized Exchanges (Uniswap, Sushiswap, Curve, Balancer, ...), daily in details', - tags: ['index', 'defi', 'tvl'], - author: 'DEX', + 'Real time ETH/USDT orderbook\n\nTo take the bid orders, access data.bids array\nTo take the ask orders, access data.asks array\n\nResponse schema:\n\n```json\n{\n "code":"200000",\n "data":\n {\n "time":1665865828392,\n "sequence":"357821345",\n "bids":\n [\n ["1280","0.00078381"],\n ["1279.9","0.02901545"],\n ....\n ],\n "asks":\n [\n ["1280.2","0.0288382"],\n ["1280.3","0.00167897"],\n ...\n ]\n }\n}\n```\n\nAccess is permited for 1 day after buying.', + tags: ['defi', 'orderbook'], + author: '0x4Ab0C24005c410111e21aE16Df5e19180fAD0f6a', license: 'https://market.oceanprotocol.com/terms', additionalInformation: { termsAndConditions: true } - } + }, + services: [ + { + id: '24654b91482a3351050510ff72694d88edae803cf31a5da993da963ba0087648', + type: 'access', + files: + '0x04beba2f90639ff7559618160df5a81729904022578e6bd5f60c3bebfe5cb2aca59d7e062228a98ed88c4582c290045f47cdf3824d1c8bb25b46b8e10eb9dc0763ce82af826fd347517011855ce1396ac94af8cc6f29b78012b679cb78a594d9064b6f6f4a8229889f0bb53262b6ab62b56fa5c608ea126ba228dd0f87290c0628fe07023416280c067beb01a42d0a4df95fdb5a857f1f59b3e6a13b0ae4619080369ba5bede6c7beff6afc7fc31c71ed8100e7817d965d1f8f1abfaace3c01f0bd5d0127df308175941088a1f120a4d9a0290be590d65a7b4de01ae1efe24286d7a06fadeeafba83b5eab25b90961abf1f24796991f06de6c8e1c2357fbfb31f484a94e87e7dba80a489e12fffa1adde89f113b4c8c4c8877914911a008dbed0a86bdd9d14598c35894395fb4a8ea764ed2f9459f6acadac66e695b3715536338f6cdee616b721b0130f726c78ca60ec02fc86c', + datatokenAddress: '0xfF4AE9869Cafb5Ff725f962F3Bbc22Fb303A8aD8', + serviceEndpoint: 'https://v4.provider.polygon.oceanprotocol.com', + timeout: 0 + } + ] } export const genericAlgorithm = { '@context': ['https://w3id.org/did/v1'], diff --git a/src/test/integration/download.test.ts b/src/test/integration/download.test.ts index 05f2d1bc6..bf18547bc 100644 --- a/src/test/integration/download.test.ts +++ b/src/test/integration/download.test.ts @@ -56,6 +56,7 @@ describe('Should run a complete node flow.', () => { let publishedDataset: any let actualDDO: any let indexer: OceanIndexer + let anotherConsumer: ethers.Wallet const mockSupportedNetworks: RPCS = getMockSupportedNetworks() const serviceId = '0' @@ -98,6 +99,10 @@ describe('Should run a complete node flow.', () => { } provider = new JsonRpcProvider('http://127.0.0.1:8545') + anotherConsumer = new ethers.Wallet( + ENVIRONMENT_VARIABLES.NODE2_PRIVATE_KEY.value, + provider + ) publisherAccount = (await provider.getSigner(0)) as Signer consumerAccount = (await provider.getSigner(1)) as Signer @@ -294,7 +299,7 @@ describe('Should run a complete node flow.', () => { serviceId, transferTxId: orderTxId, nonce: Date.now().toString(), - consumerAddress: '0xBE5449a6A97aD46c8558A3356267Ee5D2731ab57', + consumerAddress: await anotherConsumer.getAddress(), signature: '0xBE5449a6', command: PROTOCOL_COMMANDS.DOWNLOAD } diff --git a/src/test/integration/indexer.test.ts b/src/test/integration/indexer.test.ts index 5c8b53c21..294f812d5 100644 --- a/src/test/integration/indexer.test.ts +++ b/src/test/integration/indexer.test.ts @@ -58,6 +58,7 @@ import { getConfiguration } from '../../utils/config.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' import { encrypt } from '../../utils/crypt.js' import { EncryptMethod } from '../../@types/fileObject.js' +import { deleteIndexedMetadataIfExists } from '../../utils/asset.js' describe('Indexer stores a new metadata events and orders.', () => { let database: Database @@ -176,6 +177,7 @@ describe('Indexer stores a new metadata events and orders.', () => { genericAsset.nftAddress = nftAddress assetDID = genericAsset.id // create proper service.files string + genericAsset.services[0].datatokenAddress = datatokenAddress genericAsset.services[0].files.datatokenAddress = datatokenAddress genericAsset.services[0].files.nftAddress = nftAddress // let's call node to encrypt @@ -208,11 +210,6 @@ describe('Indexer stores a new metadata events and orders.', () => { genericAsset.event.from = setMetaDataTxReceipt.from genericAsset.event.contract = setMetaDataTxReceipt.contractAddress genericAsset.event.datetime = '2023-02-15T16:42:22' - - genericAsset.nft.address = nftAddress - genericAsset.nft.owner = setMetaDataTxReceipt.from - genericAsset.nft.state = 0 - genericAsset.nft.created = '2022-12-30T08:40:43' }) it('should store the ddo in the database and return it ', async function () { @@ -229,24 +226,35 @@ describe('Indexer stores a new metadata events and orders.', () => { }) it('should have nft field stored in ddo', async function () { - assert(resolvedDDO.nft, 'NFT field is not present') + assert(resolvedDDO.indexedMetadata.nft, 'NFT field is not present') assert( - resolvedDDO.nft.address?.toLowerCase() === nftAddress?.toLowerCase(), + resolvedDDO.indexedMetadata.nft.address?.toLowerCase() === + nftAddress?.toLowerCase(), 'NFT address mismatch' ) - assert(resolvedDDO.nft.state === 0, 'NFT state mismatch') // ACTIVE - assert(resolvedDDO.nft.name === (await nftContract.name()), 'NFT name mismatch') - assert(resolvedDDO.nft.symbol === (await nftContract.symbol()), 'NFT symbol mismatch') + assert(resolvedDDO.indexedMetadata.nft.state === 0, 'NFT state mismatch') // ACTIVE + assert( + resolvedDDO.indexedMetadata.nft.name === (await nftContract.name()), + 'NFT name mismatch' + ) assert( - resolvedDDO.nft.tokenURI === + resolvedDDO.indexedMetadata.nft.symbol === (await nftContract.symbol()), + 'NFT symbol mismatch' + ) + assert( + resolvedDDO.indexedMetadata.nft.tokenURI === (await nftContract.tokenURI(await nftContract.getId())), 'NFT tokeURI mismatch' ) assert( - resolvedDDO.nft.owner?.toLowerCase() === setMetaDataTxReceipt.from?.toLowerCase(), + resolvedDDO.indexedMetadata.nft.owner?.toLowerCase() === + setMetaDataTxReceipt.from?.toLowerCase(), 'NFT owner mismatch' ) - assert(resolvedDDO.nft.created, 'NFT created timestamp does not exist') + assert( + resolvedDDO.indexedMetadata.nft.created, + 'NFT created timestamp does not exist' + ) }) it('should store the ddo state in the db with no errors and retrieve it using did', async function () { @@ -288,6 +296,7 @@ describe('Indexer stores a new metadata events and orders.', () => { resolvedDDO.metadata.name = 'dataset-name-updated' resolvedDDO.metadata.description = 'Updated description for the Ocean protocol test dataset' + resolvedDDO = deleteIndexedMetadataIfExists(resolvedDDO) const stringDDO = JSON.stringify(resolvedDDO) const bytes = Buffer.from(stringDDO) const metadata = hexlify(bytes) @@ -338,10 +347,12 @@ describe('Indexer stores a new metadata events and orders.', () => { ) const retrievedDDO: any = ddo if (retrievedDDO) { - expect(retrievedDDO.nft).to.not.equal(undefined) + expect(retrievedDDO.indexedMetadata.nft).to.not.equal(undefined) expect(retrievedDDO).to.have.nested.property('nft.state') // Expect the result from contract - expect(retrievedDDO.nft.state).to.equal(parseInt(result[2].toString())) + expect(retrievedDDO.indexedMetadata.nft.state).to.equal( + parseInt(result[2].toString()) + ) } else expect(expectedTimeoutFailure(this.test.title)).to.be.equal(wasTimeout) }) @@ -362,7 +373,7 @@ describe('Indexer stores a new metadata events and orders.', () => { if (retrievedDDO != null) { // Expect the result from contract expect(retrievedDDO.id).to.equal(assetDID) - expect(retrievedDDO.nft.state).to.equal(0) + expect(retrievedDDO.indexedMetadata.nft.state).to.equal(0) } else expect(expectedTimeoutFailure(this.test.title)).to.be.equal(wasTimeout) }) @@ -449,9 +460,15 @@ describe('Indexer stores a new metadata events and orders.', () => { true ) if (ddo) { - const retrievedDDO: any = ddo - expect(retrievedDDO.stats.orders).to.equal(1) - initialOrderCount = retrievedDDO.stats.orders + const retrievedDDO: DDO = ddo + console.log('indexer retrieved ddo: ', JSON.stringify(retrievedDDO)) + for (const stat of retrievedDDO.indexedMetadata.stats) { + if (stat.datatokenAddress === datatokenAddress) { + expect(stat.orders).to.equal(1) + initialOrderCount = stat.orders + break + } + } const resultOrder = await database.order.retrieve(orderTxId) if (resultOrder) { if (resultOrder.id) { @@ -536,10 +553,15 @@ describe('Indexer stores a new metadata events and orders.', () => { true ) - const retrievedDDO: any = ddo + const retrievedDDO: DDO = ddo if (retrievedDDO) { - expect(retrievedDDO.stats.orders).to.be.greaterThan(initialOrderCount) + for (const stat of retrievedDDO.indexedMetadata.stats) { + if (stat.datatokenAddress === datatokenAddress) { + expect(stat.orders).to.be.greaterThan(initialOrderCount) + break + } + } const resultOrder = await database.order.retrieve(reuseOrderTxId) if (resultOrder) { if (resultOrder.id) { @@ -585,7 +607,9 @@ describe('Indexer stores a new metadata events and orders.', () => { // Expect a short version of the DDO expect(Object.keys(resolvedDDO).length).to.equal(4) expect( - 'id' in resolvedDDO && 'nftAddress' in resolvedDDO && 'nft' in resolvedDDO + 'id' in resolvedDDO && + 'nftAddress' in resolvedDDO && + 'nft' in resolvedDDO.indexedMetadata ).to.equal(true) } else { expect(expectedTimeoutFailure(this.test.title)).to.be.equal(wasTimeout) @@ -669,7 +693,6 @@ describe('OceanIndexer - crawler threads', () => { envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) config = await getConfiguration(true) db = await new Database(config.dbConfig) - // oceanNode = OceanNode.getInstance(db) }) it('should start a worker thread and handle RPCS "startBlock"', async () => { diff --git a/src/test/integration/pricing.test.ts b/src/test/integration/pricing.test.ts new file mode 100644 index 000000000..8a7fc15bf --- /dev/null +++ b/src/test/integration/pricing.test.ts @@ -0,0 +1,373 @@ +import { expect, assert } from 'chai' +import { createHash } from 'crypto' +import { + JsonRpcProvider, + Signer, + Contract, + ethers, + getAddress, + hexlify, + ZeroAddress, + parseUnits +} from 'ethers' +import ERC721Factory from '@oceanprotocol/contracts/artifacts/contracts/ERC721Factory.sol/ERC721Factory.json' assert { type: 'json' } +import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC721Template.sol/ERC721Template.json' assert { type: 'json' } +import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' } +import Dispenser from '@oceanprotocol/contracts/artifacts/contracts/pools/dispenser/Dispenser.sol/Dispenser.json' assert { type: 'json' } +import { Database } from '../../components/database/index.js' +import { OceanIndexer } from '../../components/Indexer/index.js' +import { RPCS } from '../../@types/blockchain.js' +import { getEventFromTx } from '../../utils/util.js' +import { waitToIndex, expectedTimeoutFailure } from './testUtils.js' +import { genericDDO } from '../data/ddo.js' +import { + DEVELOPMENT_CHAIN_ID, + getOceanArtifactsAdresses, + getOceanArtifactsAdressesByChainId +} from '../../utils/address.js' +import { + DEFAULT_TEST_TIMEOUT, + OverrideEnvConfig, + buildEnvOverrideConfig, + getMockSupportedNetworks, + setupEnvironment, + tearDownEnvironment +} from '../utils/utils.js' +import { ENVIRONMENT_VARIABLES, EVENTS } from '../../utils/constants.js' +import { homedir } from 'os' +import { OceanNode } from '../../OceanNode.js' +import { getConfiguration } from '../../utils/config.js' +import { encrypt } from '../../utils/crypt.js' +import { EncryptMethod } from '../../@types/fileObject.js' + +describe('Publish pricing scehmas and assert ddo stats - FRE & Dispenser', () => { + let database: Database + let oceanNode: OceanNode + let provider: JsonRpcProvider + let factoryContract: Contract + let nftContract: Contract + let datatokenContract: Contract + let publisherAccount: Signer + let nftAddress: string + let datatokenAddress: string + let exchangeId: string + let dispenserContract: ethers.Contract + const chainId = 8996 + let assetDID: string + let resolvedDDO: Record = {} + let genericAsset: any + let setMetaDataTxReceipt: any + let indexer: OceanIndexer + let artifactsAddresses: any + const mockSupportedNetworks: RPCS = getMockSupportedNetworks() + let previousConfiguration: OverrideEnvConfig[] + let genericAssetCloned: any + + before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig( + [ + ENVIRONMENT_VARIABLES.RPCS, + ENVIRONMENT_VARIABLES.INDEXER_NETWORKS, + ENVIRONMENT_VARIABLES.PRIVATE_KEY, + ENVIRONMENT_VARIABLES.ADDRESS_FILE + ], + [ + JSON.stringify(mockSupportedNetworks), + JSON.stringify([8996]), + '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', + `${homedir}/.ocean/ocean-contracts/artifacts/address.json` + ] + ) + ) + + const config = await getConfiguration(true) + database = await new Database(config.dbConfig) + oceanNode = await OceanNode.getInstance() + indexer = new OceanIndexer(database, mockSupportedNetworks) + oceanNode.addIndexer(indexer) + artifactsAddresses = getOceanArtifactsAdressesByChainId(DEVELOPMENT_CHAIN_ID) + if (!artifactsAddresses) { + artifactsAddresses = getOceanArtifactsAdresses().development + } + console.log(artifactsAddresses.FixedPrice) + + provider = new JsonRpcProvider('http://127.0.0.1:8545') + publisherAccount = (await provider.getSigner(0)) as Signer + genericAsset = JSON.parse(JSON.stringify(genericDDO)) + factoryContract = new ethers.Contract( + artifactsAddresses.ERC721Factory, + ERC721Factory.abi, + publisherAccount + ) + genericAssetCloned = structuredClone(genericAsset) + delete genericAssetCloned.event + delete genericAssetCloned.nft + }) + + it('instance Database', () => { + expect(database).to.be.instanceOf(Database) + }) + + it('should publish a dataset w fre', async () => { + const tx = await factoryContract.createNftWithErc20WithFixedRate( + { + name: '72120Bundle', + symbol: '72Bundle', + templateIndex: 1, + tokenURI: 'https://oceanprotocol.com/nft/', + transferable: true, + owner: await publisherAccount.getAddress() + }, + { + strings: ['ERC20B1', 'ERC20DT1Symbol'], + templateIndex: 1, + addresses: [ + await publisherAccount.getAddress(), + ZeroAddress, + ZeroAddress, + ZeroAddress + ], + uints: [1000, 0], + bytess: [] + }, + { + fixedPriceAddress: artifactsAddresses.FixedPrice, + addresses: [ + artifactsAddresses.Ocean, + await publisherAccount.getAddress(), + await publisherAccount.getAddress(), + ZeroAddress + ], + uints: [18, 18, parseUnits('1', 18).toString(), parseUnits('0', 18).toString(), 1] + } + ) + const txReceipt = await tx.wait() + assert(txReceipt, 'transaction failed') + const event = getEventFromTx(txReceipt, 'NFTCreated') + nftAddress = event.args[0] + assert(nftAddress, 'find nft created failed') + const datatokenEvent = getEventFromTx(txReceipt, 'TokenCreated') + datatokenAddress = datatokenEvent.args[0] + assert(datatokenAddress, 'find datatoken created failed') + datatokenContract = new ethers.Contract( + datatokenAddress, + ERC20Template.abi, + publisherAccount + ) + assert(datatokenContract) + const freEvent = getEventFromTx(txReceipt, 'NewFixedRate') + exchangeId = freEvent.args[0] + assert(exchangeId, 'exchangeId not found.') + }) + + it('should set metadata and save ', async () => { + nftContract = new ethers.Contract(nftAddress, ERC721Template.abi, publisherAccount) + genericAssetCloned.id = + 'did:op:' + + createHash('sha256') + .update(getAddress(nftAddress) + chainId.toString(10)) + .digest('hex') + genericAssetCloned.nftAddress = nftAddress + assetDID = genericAssetCloned.id + // create proper service.files string + genericAssetCloned.services[0].datatokenAddress = datatokenAddress + genericAssetCloned.nftAddress = nftAddress + // let's call node to encrypt + + const data = Uint8Array.from( + Buffer.from(JSON.stringify(genericAssetCloned.services[0].files)) + ) + const encryptedData = await encrypt(data, EncryptMethod.ECIES) + const encryptedDataString = encryptedData.toString('hex') + genericAssetCloned.services[0].files = encryptedDataString + const stringDDO = JSON.stringify(genericAssetCloned) + const bytes = Buffer.from(stringDDO) + const metadata = hexlify(bytes) + const hash = createHash('sha256').update(metadata).digest('hex') + + const setMetaDataTx = await nftContract.setMetaData( + 0, + 'http://v4.provider.oceanprotocol.com', + '0x123', + '0x01', + metadata, + '0x' + hash, + [] + ) + setMetaDataTxReceipt = await setMetaDataTx.wait() + assert(setMetaDataTxReceipt, 'set metada failed') + }) + + it('should store the ddo in the database and return it ', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + const { ddo, wasTimeout } = await waitToIndex( + assetDID, + EVENTS.METADATA_CREATED, + DEFAULT_TEST_TIMEOUT * 2 + ) + if (ddo) { + resolvedDDO = ddo + console.log(`resolved ddo: ${JSON.stringify(resolvedDDO)}`) + expect(resolvedDDO.id).to.equal(genericAssetCloned.id) + } else expect(expectedTimeoutFailure(this.test.title)).to.be.equal(wasTimeout) + }) + + it('should get stats for fre', async function () { + assert(resolvedDDO.indexedMetadata, 'No stats available') + assert(resolvedDDO.indexedMetadata.stats.length === 1) + assert( + resolvedDDO.indexedMetadata.stats[0].datatokenAddress === datatokenAddress, + 'DT is missing.' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].name === (await datatokenContract.name()), + 'Name is missing.' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].orders === 0, + 'Number of orders are missing.' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].serviceId === '0', + 'Service ID is missing.' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].prices.length === 1, + 'Incorrect length of prices' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].prices[0].type === 'fixedrate', + 'Type from prices is not present.' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].prices[0].token === artifactsAddresses.Ocean, + 'Datatoken from prices is not present.' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].prices[0].price === '1.0', + 'Price is not present.' + ) + assert( + resolvedDDO.indexedMetadata.stats[0].prices[0].exchangeId === exchangeId, + 'Exchange ID is not present.' + ) + }) + + it('should attach a dispenser', async () => { + const dtTx = await nftContract.createERC20( + 1, + ['newERC20', 'newERC20s'], + [ + await publisherAccount.getAddress(), + await publisherAccount.getAddress(), + await publisherAccount.getAddress(), + datatokenAddress + ], + [parseUnits('10000', 18), parseUnits('1', 18)], + [] + ) + assert(dtTx, 'Cannot create datatoken') + const dtTxReceipt = await dtTx.wait() + const dtEvent = getEventFromTx(dtTxReceipt, 'TokenCreated') + const newdatatokenAddress = dtEvent.args[0] + const newDtContract = new ethers.Contract( + newdatatokenAddress, + ERC20Template.abi, + publisherAccount + ) + const tx = await newDtContract.createDispenser( + artifactsAddresses.Dispenser, + parseUnits('1', 18), + parseUnits('1', 18), + true, + await publisherAccount.getAddress() + ) + assert(tx, 'Cannot create dispenser') + const txReceipt = await tx.wait() + const dispenserEvent = getEventFromTx(txReceipt, 'NewDispenser') + const dispenserAddress = dispenserEvent.topics[0] + assert(dispenserAddress, 'Dispenser contract not retrieved') + + dispenserContract = new ethers.Contract( + dispenserAddress, + Dispenser.abi, + publisherAccount + ) + assert(dispenserContract) + genericAssetCloned.services.push({ + id: '1', + type: 'access', + description: 'Download service', + datatokenAddress: newdatatokenAddress, + files: [ + { + url: 'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js', + contentType: 'text/js', + encoding: 'UTF-8' + } + ], + serviceEndpoint: 'http://172.15.0.4:8030', + timeout: 0 + }) + assert(genericAssetCloned.services.length === 2, 'the 2 services are not present') + const data = Uint8Array.from( + Buffer.from(JSON.stringify(genericAssetCloned.services[1].files)) + ) + const encryptedData = await encrypt(data, EncryptMethod.ECIES) + const encryptedDataString = encryptedData.toString('hex') + genericAssetCloned.services[1].files = encryptedDataString + const stringDDO = JSON.stringify(genericAssetCloned) + const bytes = Buffer.from(stringDDO) + const metadata = hexlify(bytes) + const hash = createHash('sha256').update(metadata).digest('hex') + + const setMetaDataTx = await nftContract.setMetaData( + 0, + 'http://v4.provider.oceanprotocol.com', + '0x123', + '0x01', + metadata, + '0x' + hash, + [] + ) + setMetaDataTxReceipt = await setMetaDataTx.wait() + assert(setMetaDataTxReceipt, 'set metada failed') + }) + it('should store the updated ddo in the database and return it ', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + const { ddo, wasTimeout } = await waitToIndex( + genericAssetCloned.id, + EVENTS.METADATA_UPDATED, + DEFAULT_TEST_TIMEOUT * 2, + true + ) + console.log(`updated ddo: ${JSON.stringify(ddo.indexedMetadata.stats)}`) + if (ddo) { + assert( + ddo.indexedMetadata.stats.length === 2, + 'the 2 pricing schemas were not captured in the stats' + ) + assert( + ddo.indexedMetadata.stats[1].prices[0].type === 'dispenser', + 'type is not dispenser' + ) + assert( + ddo.indexedMetadata.stats[1].datatokenAddress === + genericAssetCloned.services[1].datatokenAddress, + 'mismatch datatoken address' + ) + assert(ddo.indexedMetadata.stats[1].prices[0].price === '0', 'price is not 0') + assert( + ddo.indexedMetadata.stats[1].prices[0].token === + genericAssetCloned.services[1].datatokenAddress, + 'mismatch datatoken address' + ) + } else expect(expectedTimeoutFailure(this.test.title)).to.be.equal(wasTimeout) + }) + after(async () => { + await tearDownEnvironment(previousConfiguration) + indexer.stopAllThreads() + }) +}) diff --git a/src/test/integration/typesense.test.ts b/src/test/integration/typesense.test.ts index 1bf59a24e..8742284ca 100644 --- a/src/test/integration/typesense.test.ts +++ b/src/test/integration/typesense.test.ts @@ -6,6 +6,7 @@ import { import { ddoSchema } from '../data/ddoSchema.js' import { ddo } from '../data/ddo.js' import { expect } from 'chai' +import { TypesenseSearchParams } from '../../@types/index.js' describe('Typesense', () => { let typesense: Typesense @@ -161,14 +162,17 @@ describe('Typesense documents', () => { expect(result.metadata).to.not.be.an('undefined') expect(result.metadata.name).to.be.equal(ddo.metadata.name) }) - it('search document in ddo collection', async () => { - const result = await typesense.collections(ddoSchema.name).documents().search({ - q: 'DEX', - query_by: 'metadata.author', - filter_by: 'chainId:<138', + const queryParams: TypesenseSearchParams = { + q: 'new metadata name', + query_by: 'metadata.name', + filter_by: 'chainId:=137', sort_by: 'version:desc' - }) + } + const result = await typesense + .collections(ddoSchema.name) + .documents() + .search(queryParams) expect(result.found).to.equal(1) expect(result.hits[0]).to.not.be.an('undefined') diff --git a/src/test/unit/download.test.ts b/src/test/unit/download.test.ts index 49cfd178f..e433eb287 100644 --- a/src/test/unit/download.test.ts +++ b/src/test/unit/download.test.ts @@ -65,8 +65,7 @@ describe('Should validate files structure for download', () => { serviceEndpoint: 'http://127.0.0.1:8001', timeout: 86400 } - ], - event: {} + ] } const assetURL = { diff --git a/src/test/utils/assets.ts b/src/test/utils/assets.ts index 9db9e8136..f612ff17e 100644 --- a/src/test/utils/assets.ts +++ b/src/test/utils/assets.ts @@ -78,6 +78,8 @@ export async function publishAsset(asset: any, publisherAccount: Signer) { genericAsset.services[0].files.datatokenAddress = datatokenAddress genericAsset.services[0].files.nftAddress = nftAddress + genericAsset.services[0].datatokenAddress = datatokenAddress + genericAsset.nftAddress = nftAddress // let's call node to encrypt const data = Uint8Array.from( diff --git a/src/utils/asset.ts b/src/utils/asset.ts index 44eeddb51..c102e1868 100644 --- a/src/utils/asset.ts +++ b/src/utils/asset.ts @@ -91,6 +91,17 @@ export function validateDDOHash( return ddoID === hashAddressAndChain } +export function deleteIndexedMetadataIfExists( + ddo: Record +): Record { + const ddoCopy: Record = structuredClone(ddo) + if ('indexedMetadata' in ddoCopy) { + delete ddoCopy.indexedMetadata + return ddoCopy + } + return ddo +} + /** * Generates DDO Id given the chain and nft address provided * @param nftAddress the nft address diff --git a/src/utils/constants.ts b/src/utils/constants.ts index ccd4ff9f8..4ab18800d 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -84,7 +84,11 @@ export const EVENTS = { TOKEN_URI_UPDATE: 'TokenURIUpdate', EXCHANGE_CREATED: 'ExchangeCreated', EXCHANGE_RATE_CHANGED: 'ExchangeRateChanged', - DISPENSER_CREATED: 'DispenserCreated' + DISPENSER_CREATED: 'DispenserCreated', + DISPENSER_ACTIVATED: 'DispenserActivated', + DISPENSER_DEACTIVATED: 'DispenserDeactivated', + EXCHANGE_ACTIVATED: 'ExchangeActivated', + EXCHANGE_DEACTIVATED: 'ExchangeDeactivated' } export const INDEXER_CRAWLING_EVENTS = { @@ -138,6 +142,22 @@ export const EVENT_HASHES: Hashes = { '0x7d0aa581e6eb87e15f58588ff20c39ff6622fc796ec9bb664df6ed3eb02442c9': { type: EVENTS.DISPENSER_CREATED, text: 'DispenserCreated(address,address,uint256,uint256,address)' + }, + '0xe9372084cb52c5392afee4b9d79d131e04b1e65676088d50a8f39fffb16a8745': { + type: EVENTS.DISPENSER_ACTIVATED, + text: 'DispenserActivated(address)' + }, + '0x393f01061139648745ea000bb047bbe1785bd3a19d3a9c90f6747e1d2357d2b8': { + type: EVENTS.DISPENSER_DEACTIVATED, + text: 'DispenserDeactivated(address)' + }, + '0xc7344c45124818d1d3a4c24ccb9b86d8b88d3bd05209b2a42b494cb32a503529': { + type: EVENTS.EXCHANGE_ACTIVATED, + text: 'ExchangeActivated(bytes32,address)' + }, + '0x03da9148e1de78fba22de63c573465562ebf6ef878a1d3ea83790a560229984c': { + type: EVENTS.EXCHANGE_DEACTIVATED, + text: 'ExchangeDeactivated(bytes32,address)' } }