Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29,516 changes: 19,789 additions & 9,727 deletions package-lock.json

Large diffs are not rendered by default.

37 changes: 19 additions & 18 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@
"@libp2p/circuit-relay-v2": "^1.1.1",
"@libp2p/crypto": "^4.1.5",
"@libp2p/dcutr": "^1.1.1",
"@libp2p/floodsub": "^9.1.1",
"@libp2p/identify": "^2.1.1",
"@libp2p/interface": "^1.6.0",
"@libp2p/interface-address-manager": "^3.0.1",
"@libp2p/kad-dht": "^12.1.1",
"@libp2p/mdns": "^10.1.1",
"@libp2p/peer-id": "^4.1.4",
Expand All @@ -75,15 +72,11 @@
"@multiformats/multiaddr": "^10.2.0",
"@oceanprotocol/contracts": "^2.5.0",
"@oceanprotocol/ddo-js": "^0.1.4",
"@types/lodash.clonedeep": "^4.5.7",
"@types/lodash.merge": "^4.6.9",
"@types/lodash.set": "^4.3.9",
"aws-sdk": "^2.1591.0",
"aws-sdk": "^2.1693.0",
"axios": "^1.12.0",
"base58-js": "^2.0.0",
"cors": "^2.8.5",
"delay": "^5.0.0",
"docker-registry-client": "^3.4.0",
"dockerode": "^4.0.5",
"dotenv": "^16.3.1",
"eciesjs": "^0.4.5",
Expand All @@ -92,24 +85,18 @@
"express": "^4.21.1",
"humanhash": "^1.0.4",
"hyperdiff": "^2.0.16",
"ip": "^2.0.1",
"ipaddr.js": "^2.3.0",
"it-pipe": "^3.0.1",
"jsonwebtoken": "^9.0.2",
"libp2p": "^1.8.0",
"lodash.clonedeep": "^4.5.0",
"lodash.merge": "^4.6.2",
"lodash.set": "^4.3.2",
"lodash": "^4.17.21",
"lzma-purejs-requirejs": "^1.0.0",
"node-cron": "^3.0.3",
"private-ip": "^3.0.2",
"sqlite3": "^5.1.7",
"stream-concat": "^1.0.0",
"tar": "^7.4.3",
"ts-node": "^10.9.1",
"tsoa": "^5.1.1",
"uint8arrays": "^4.0.6",
"url-join": "^5.0.0",
"uuid": "^11.1.0",
"winston": "^3.11.0",
"winston-daily-rotate-file": "^4.7.1",
"winston-transport": "^4.6.0",
Expand All @@ -122,7 +109,7 @@
"@types/express": "^4.17.17",
"@types/ip": "^1.1.3",
"@types/jsonwebtoken": "^9.0.9",
"@types/lzma-native": "^4.0.4",
"@types/lodash": "^4.17.21",
"@types/mocha": "^10.0.10",
"@types/node": "^20.14.2",
"@types/node-cron": "^3.0.11",
Expand All @@ -142,10 +129,24 @@
"mocha": "^11.1.0",
"nyc": "^17.1.0",
"prettier": "^3.0.3",
"release-it": "^17.6.0",
"release-it": "^19.0.6",
"sinon": "^19.0.2",
"tsx": "^4.19.3"
},
"overrides": {
"elliptic": "^6.6.1",
"tough-cookie": "^4.1.3",
"xml2js": "^0.5.0",
"semver": "^7.5.2",
"tmp": "^0.2.3",
"base64url": "^3.0.1",
"eth-crypto": {
"secp256k1": "^5.0.0"
},
"eccrypto": {
"secp256k1": "^5.0.0"
}
},
"release-it": {
"hooks": {
"after:bump": "npm run changelog"
Expand Down
2 changes: 1 addition & 1 deletion src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export interface DownloadCommand extends Command {
consumerAddress: string
signature: string
aes_encrypted_key?: string // if not present it means download without encryption
policyServer?: PolicyServerTask // object to pass to policy server
policyServer?: any // object to pass to policy server
userData?: Record<string, any>
}

Expand Down
1 change: 0 additions & 1 deletion src/@types/docker-registry-lient.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/components/Indexer/processors/BaseProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' }
import { fetchTransactionReceipt } from '../../core/utils/validateOrders.js'
import { withRetrial } from '../utils.js'
import { createHash } from 'node:crypto'
import { AbstractDdoDatabase } from '../../database/BaseDatabase.js'
import { createHash } from 'crypto'

export abstract class BaseEventProcessor {
protected networkId: number
Expand Down
4 changes: 2 additions & 2 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ export async function handleProtocolCommands(otherPeerConnection: any) {
if (connectionStatus === 'open') {
if (sendStream == null) {
await pipe(statusStream, otherPeerConnection.stream.sink)
await closeStreamConnection(otherPeerConnection.connection, remotePeer)
} else {
const combinedStream = new StreamConcat([statusStream, sendStream], {
highWaterMark: JSON.stringify(status).length // important for reading chunks correctly on sink!
})
await pipe(combinedStream, otherPeerConnection.stream.sink)
// Don't close for data streams - sender closes when done reading
}
}

await closeStreamConnection(otherPeerConnection.connection, remotePeer)
} catch (err) {
P2P_LOGGER.logMessageWithEmoji(
'handleProtocolCommands Error: ' + err.message,
Expand Down
64 changes: 33 additions & 31 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// import diff from 'hyperdiff'
import { P2PCommandResponse } from '../../@types/index'
import EventEmitter from 'node:events'
import clone from 'lodash.clonedeep'

import lodash from 'lodash'
import { handleProtocolCommands } from './handlers.js'

import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down Expand Up @@ -30,7 +29,6 @@ import {
removePrivateAddressesMapper,
removePublicAddressesMapper
} from '@libp2p/kad-dht'
// import { gossipsub } from '@chainsafe/libp2p-gossipsub'

import { EVENTS, cidFromRawString } from '../../utils/index.js'
import { Transform } from 'stream'
Expand All @@ -41,8 +39,7 @@ import {
dhtFilterMethod
} from '../../@types/OceanNode.js'
// eslint-disable-next-line camelcase
import is_ip_private from 'private-ip'
import ip from 'ip'
import ipaddr from 'ipaddr.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { INDEXER_DDO_EVENT_EMITTER } from '../Indexer/index.js'
import { P2P_LOGGER } from '../../utils/logging/common.js'
Expand Down Expand Up @@ -107,7 +104,6 @@ export class OceanP2P extends EventEmitter {
async start(options: any = null) {
this._topic = 'oceanprotocol'
this._libp2p = await this.createNode(this._config)

this._libp2p.addEventListener('peer:connect', (evt: any) => {
this.handlePeerConnect(evt)
})
Expand All @@ -117,8 +113,11 @@ export class OceanP2P extends EventEmitter {
this._libp2p.addEventListener('peer:discovery', (details: any) => {
this.handlePeerDiscovery(details)
})

this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
this._options = Object.assign(
{},
lodash.cloneDeep(DEFAULT_OPTIONS),
lodash.cloneDeep(options)
)
this._peers = []
this._connections = {}
this._protocol = '/ocean/nodes/1.0.0'
Expand Down Expand Up @@ -189,40 +188,40 @@ export class OceanP2P extends EventEmitter {
try {
const maddr = multiaddr(addr)
// always filter loopback
if (ip.isLoopback(maddr.nodeAddress().address)) {
// disabled logs because of flooding
// P2P_LOGGER.debug('Deny announcement of loopback ' + maddr.nodeAddress().address)
const addressString = maddr.nodeAddress().address

if (!ipaddr.isValid(addressString)) {
return false
}

const parsedAddr = ipaddr.parse(addressString)
const range = parsedAddr.range()

if (range === 'loopback') {
return false
}
// check filters
for (const filter of this._config.p2pConfig.filterAnnouncedAddresses) {
if (ip.cidrSubnet(filter).contains(maddr.nodeAddress().address)) {
// disabled logs because of flooding
// P2P_LOGGER.debug(
// 'Deny announcement of filtered ' +
// maddr.nodeAddress().address +
// '(belongs to ' +
// filter +
// ')'
// )
return false
try {
const parsedCIDR = ipaddr.parseCIDR(filter)
if ((parsedAddr as any).match(parsedCIDR as any)) {
return false
}
} catch (e) {
P2P_LOGGER.error(`Invalid CIDR filter in config: ${filter}`)
}
}
if (
this._config.p2pConfig.announcePrivateIp === false &&
(is_ip_private(maddr.nodeAddress().address) ||
ip.isPrivate(maddr.nodeAddress().address))
(range === 'private' || range === 'uniqueLocal')
) {
// disabled logs because of flooding
// P2P_LOGGER.debug(
// 'Deny announcement of private address ' + maddr.nodeAddress().address
// )
return false
} else {
// disabled logs because of flooding
// P2P_LOGGER.debug('Allow announcement of ' + maddr.nodeAddress().address)
return true
}
return true
} catch (e) {
// we reach this part when having circuit relay. this is fine
return true
Expand Down Expand Up @@ -652,7 +651,7 @@ export class OceanP2P extends EventEmitter {
// dial/connect to the target node
try {
const options = {
signal: AbortSignal.timeout(3000),
signal: AbortSignal.timeout(10000),
priority: 100,
runOnTransientConnection: true
}
Expand Down Expand Up @@ -685,9 +684,12 @@ export class OceanP2P extends EventEmitter {
sink
)
} catch (err) {
P2P_LOGGER.error(`Unable to send P2P message: ${err.message}`)
response.status.httpStatus = 404
response.status.error = err.message
P2P_LOGGER.error(
`Cannot connect to peer - Unable to send P2P message: ${err.message}`
)
response.status.httpStatus = 500
response.status.error = `Cannot connect to peer - Unable to send P2P message: ${err.message}`
response.stream = null
}
} else {
response.status.httpStatus = 404
Expand Down
83 changes: 71 additions & 12 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import { AssetUtils } from '../../utils/asset.js'
import { FindDdoHandler } from '../core/handler/ddoHandler.js'
import { OceanNode } from '../../OceanNode.js'
import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.js'
import * as drc from 'docker-registry-client'
import { ValidateParams } from '../httpRoutes/validateCommands.js'
import { Service } from '@oceanprotocol/ddo-js'
import { getOceanTokenAddressForChain } from '../../utils/address.js'
Expand All @@ -59,6 +58,8 @@ export class C2DEngineDocker extends C2DEngine {
private cronTimer: any
private cronTime: number = 2000
private jobImageSizes: Map<string, number> = new Map()
private static DEFAULT_DOCKER_REGISTRY = 'https://registry-1.docker.io'

public constructor(clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow) {
super(clusterConfig, db, escrow)

Expand Down Expand Up @@ -329,6 +330,74 @@ export class C2DEngineDocker extends C2DEngine {
return filteredEnvs
}

private static parseImage(image: string) {
let registry = C2DEngineDocker.DEFAULT_DOCKER_REGISTRY
let name = image
let ref = 'latest'

const atIdx = name.indexOf('@')
const colonIdx = name.lastIndexOf(':')

if (atIdx !== -1) {
ref = name.slice(atIdx + 1)
name = name.slice(0, atIdx)
} else if (colonIdx !== -1 && !name.slice(colonIdx).includes('/')) {
ref = name.slice(colonIdx + 1)
name = name.slice(0, colonIdx)
}

const firstSlash = name.indexOf('/')
if (firstSlash !== -1) {
const potential = name.slice(0, firstSlash)
if (potential.includes('.') || potential.includes(':')) {
registry = potential.includes('localhost')
? `http://${potential}`
: `https://${potential}`
name = name.slice(firstSlash + 1)
}
}

if (registry === C2DEngineDocker.DEFAULT_DOCKER_REGISTRY && !name.includes('/')) {
name = `library/${name}`
}

return { registry, name, ref }
}

public static async getDockerManifest(image: string): Promise<any> {
const { registry, name, ref } = C2DEngineDocker.parseImage(image)
const url = `${registry}/v2/${name}/manifests/${ref}`
let headers: Record<string, string> = {
Accept:
'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.index.v1+json'
}
let response = await fetch(url, { headers })

if (response.status === 401) {
const match = (response.headers.get('www-authenticate') || '').match(
/Bearer realm="([^"]+)",service="([^"]+)"/
)
if (match) {
const tokenUrl = new URL(match[1])
tokenUrl.searchParams.set('service', match[2])
tokenUrl.searchParams.set('scope', `repository:${name}:pull`)
const { token } = (await fetch(tokenUrl.toString()).then((r) => r.json())) as {
token: string
}
headers = { ...headers, Authorization: `Bearer ${token}` }
response = await fetch(url, { headers })
}
}

if (!response.ok) {
const body = await response.text()
throw new Error(
`Failed to get manifest: ${response.status} ${response.statusText} - ${body}`
)
}
return await response.json()
}

/**
* Checks the docker image by looking at the manifest
* @param image name or tag
Expand All @@ -339,16 +408,7 @@ export class C2DEngineDocker extends C2DEngine {
platform?: RunningPlatform
): Promise<ValidateParams> {
try {
const info = drc.default.parseRepoAndRef(image)
const client = drc.createClientV2({ name: info.localName })
const ref = info.tag || info.digest

const manifest = await new Promise<any>((resolve, reject) => {
client.getManifest({ ref, maxSchemaVersion: 2 }, (err: any, result: any) => {
client.close()
err ? reject(err) : resolve(result)
})
})
const manifest = await C2DEngineDocker.getDockerManifest(image)

const platforms = Array.isArray(manifest.manifests)
? manifest.manifests.map((entry: any) => entry.platform)
Expand Down Expand Up @@ -1798,7 +1858,6 @@ export class C2DEngineDocker extends C2DEngine {
decryptedFileObject,
asset.userdata
)

storage = Storage.getStorageClass(decryptedFileObject, config)
fileInfo = await storage.getFileInfo({
type: storage.getStorageType(decryptedFileObject)
Expand Down
Loading
Loading