Skip to content

Commit

Permalink
Merge pull request #5000 from mnaamani/colossus-specify-temp-folder
Browse files Browse the repository at this point in the history
temp folder refactor
  • Loading branch information
mnaamani authored Dec 26, 2023
2 parents 10baf39 + 3dd8e0a commit c4789f9
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 93 deletions.
10 changes: 6 additions & 4 deletions docker-compose-no-bind-volumes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ services:
# - OTEL_RESOURCE_ATTRIBUTES=service.name=colossus-1,deployment.environment=production
entrypoint: ['yarn']
command: [
'start', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data',
'start', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data/uploads',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-1:
Expand Down Expand Up @@ -103,11 +104,12 @@ services:
- ACCOUNT_URI=${COLOSSUS_2_TRANSACTOR_URI}
entrypoint: ['yarn', 'storage-node']
command: [
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data',
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data/uploads',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-2:
Expand Down
10 changes: 6 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ services:
- OTEL_RESOURCE_ATTRIBUTES=service.name=colossus-1,deployment.environment=production
entrypoint: ['/joystream/entrypoints/storage.sh']
command: [
'server', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data',
'server', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data/uploads/',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-1:
Expand Down Expand Up @@ -106,11 +107,12 @@ services:
- ACCOUNT_URI=${COLOSSUS_2_TRANSACTOR_URI}
entrypoint: ['yarn', 'storage-node']
command: [
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data',
'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data/uploads',
'--sync', '--syncInterval=1',
'--queryNodeEndpoint=${COLOSSUS_QUERY_NODE_URL}',
'--apiUrl=${JOYSTREAM_NODE_WS}',
'--logFilePath=/logs'
'--logFilePath=/logs',
'--tempFolder=/data/temp/'
]

distributor-2:
Expand Down
3 changes: 3 additions & 0 deletions storage-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@
},
"operator": {
"description": "Storage provider(operator) commands."
},
"util": {
"description": "Useful utility commands."
}
}
},
Expand Down
79 changes: 42 additions & 37 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { flags } from '@oclif/command'
import { ApiPromise } from '@polkadot/api'
import { KeyringPair } from '@polkadot/keyring/types'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import fs from 'fs'
import sleep from 'sleep-promise'
import _ from 'lodash'
import path from 'path'
import rimraf from 'rimraf'
import sleep from 'sleep-promise'
import { promisify } from 'util'
import ApiCommandBase from '../command-base/ApiCommandBase'
import fs from 'fs'
import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup'
import { KeyringPair } from '@polkadot/keyring/types'
import { customFlags } from '../command-base/CustomFlags'
import { loadDataObjectIdCache } from '../services/caching/localDataObjects'
import logger, { DatePatternByFrequency, Frequency, initNewLogger } from '../services/logger'
Expand All @@ -23,6 +20,7 @@ import { getStorageBucketIdsByWorkerId } from '../services/sync/storageObligatio
import { PendingDirName, performSync, TempDirName } from '../services/sync/synchronizer'
import { createApp } from '../services/webApi/app'
import ExitCodes from './../command-base/ExitCodes'
import ApiCommandBase from '../command-base/ApiCommandBase'
import { v4 as uuidv4 } from 'uuid'
const fsPromises = fs.promises

Expand Down Expand Up @@ -53,6 +51,10 @@ export default class Server extends ApiCommandBase {
required: true,
description: 'Data uploading directory (absolute path).',
}),
tempFolder: flags.string({
description:
'Directory to store tempory files during sync and upload (absolute path).\nIf not specified a subfolder under the uploads directory will be used.',
}),
port: flags.integer({
char: 'o',
required: true,
Expand Down Expand Up @@ -158,11 +160,19 @@ Supported values: warn, error, debug, info. Default:debug`,
async run(): Promise<void> {
const { flags } = this.parse(Server)

const logSource = `StorageProvider_${flags.worker}`
const api = await this.getApi()

if (flags.dev) {
await this.ensureDevelopmentChain()
}

if (flags.logFilePath && path.relative(flags.logFilePath, flags.uploads) === '') {
this.error('Paths for logs and uploads must be unique.')
}

if (!_.isEmpty(flags.elasticSearchEndpoint) || !_.isEmpty(flags.logFilePath)) {
initNewLogger({
elasticSearchlogSource: logSource,
elasticSearchlogSource: `StorageProvider_${flags.worker}`,
elasticSearchEndpoint: flags.elasticSearchEndpoint,
elasticSearchIndexPrefix: flags.elasticSearchIndexPrefix,
elasticSearchUser: flags.elasticSearchUser,
Expand All @@ -176,8 +186,6 @@ Supported values: warn, error, debug, info. Default:debug`,

logger.info(`Query node endpoint set: ${flags.queryNodeEndpoint}`)

const api = await this.getApi()

const workerId = flags.worker

if (!(await verifyWorkerId(api, workerId))) {
Expand Down Expand Up @@ -220,16 +228,26 @@ Supported values: warn, error, debug, info. Default:debug`,
const enableUploadingAuth = false
const operatorRoleKey = undefined

await recreateTempDirectory(flags.uploads, TempDirName)

if (fs.existsSync(flags.uploads)) {
await loadDataObjectIdCache(flags.uploads, TempDirName, PendingDirName)
if (!flags.tempFolder) {
logger.warn(
'You did not specify a path to the temporary directory. ' +
'A temp folder under the uploads folder willl be used. ' +
'In a future release passing an absolute path to a temporary directory with the ' +
'"tempFolder" argument will be required.'
)
}

if (flags.dev) {
await this.ensureDevelopmentChain()
const tempFolder = flags.tempFolder || path.join(flags.uploads, TempDirName)

if (path.relative(tempFolder, flags.uploads) === '') {
this.error('Paths for temporary and uploads folders must be unique.')
}

await createDirectory(flags.uploads)
await loadDataObjectIdCache(flags.uploads)

await createDirectory(tempFolder)

const X_HOST_ID = uuidv4()

const pendingDataObjectsDir = path.join(flags.uploads, PendingDirName)
Expand Down Expand Up @@ -259,7 +277,7 @@ Supported values: warn, error, debug, info. Default:debug`,
selectedBuckets,
qnApi,
flags.uploads,
TempDirName,
tempFolder,
flags.syncWorkersNumber,
flags.syncWorkersTimeout,
flags.syncInterval,
Expand Down Expand Up @@ -297,7 +315,6 @@ Supported values: warn, error, debug, info. Default:debug`,
try {
const port = flags.port
const maxFileSize = await api.consts.storage.maxDataObjectSize.toNumber()
const tempFileUploadingDir = path.join(flags.uploads, TempDirName)
logger.debug(`Max file size runtime parameter: ${maxFileSize}`)

const app = await createApp({
Expand All @@ -308,7 +325,7 @@ Supported values: warn, error, debug, info. Default:debug`,
workerId,
maxFileSize,
uploadsDir: flags.uploads,
tempFileUploadingDir,
tempFileUploadingDir: tempFolder,
pendingDataObjectsDir,
acceptPendingObjectsService,
process: this.config,
Expand Down Expand Up @@ -426,26 +443,14 @@ async function runCleanupWithInterval(
}

/**
* Removes and recreates the temporary directory from the uploading directory.
* All files in the temp directory are deleted.
* Creates a directory recursivly. Like `mkdir -p`
*
* @param uploadsDirectory - data uploading directory
* @param tempDirName - temporary directory name within the uploading directory
* @param tempDirName - full path to temporary directory
* @returns void promise.
*/
async function recreateTempDirectory(uploadsDirectory: string, tempDirName: string): Promise<void> {
try {
const tempFileUploadingDir = path.join(uploadsDirectory, tempDirName)

logger.info(`Removing temp directory ...`)
const rimrafAsync = promisify(rimraf)
await rimrafAsync(tempFileUploadingDir)

logger.info(`Creating temp directory ...`)
await fsPromises.mkdir(tempFileUploadingDir)
} catch (err) {
logger.error(`Temp directory IO error: ${err}`)
}
async function createDirectory(dirName: string): Promise<void> {
logger.info(`Creating directory ${dirName}`)
await fsPromises.mkdir(dirName, { recursive: true })
}

async function verifyWorkerId(api: ApiPromise, workerId: number): Promise<boolean> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ import { performCleanup } from '../../services/sync/cleanupService'
/**
* CLI command:
* Prunes outdated data objects: removes all the local stored data objects that the operator is no longer obliged to store.
* storage.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:cleanup"
* Shell command: "util:cleanup"
*/
export default class DevCleanup extends Command {
export default class Cleanup extends Command {
static description = `Runs the data objects cleanup/pruning workflow. It removes all the local stored data objects that the operator is no longer obliged to store`

static flags = {
Expand Down Expand Up @@ -48,7 +46,7 @@ export default class DevCleanup extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevCleanup)
const { flags } = this.parse(Cleanup)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Cleanup...')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { Command, flags } from '@oclif/command'
import stringify from 'fast-safe-stringify'
import logger from '../../services/logger'
import { QueryNodeApi } from '../../services/queryNode/api'
import { performSync } from '../../services/sync/synchronizer'
import { QueryNodeApi } from '../..//services/queryNode/api'
import logger from '../../services/logger'
import stringify from 'fast-safe-stringify'
import path from 'path'

/**
* CLI command:
* Synchronizes data: fixes the difference between node obligations and local
* storage.
* Fetch all data objects from a bucket into local store.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:sync"
* Should not be executed while server is running.
* Shell command: "util:fetch-bucket"
*/
export default class DevSync extends Command {
static description =
'Synchronizes the data - it fixes the differences between local data folder and worker ID obligations from the runtime.'
export default class FetchBucket extends Command {
static description = 'Downloads all data objects of specified bucket, that matches worker ID obligations.'

static flags = {
help: flags.help({ char: 'h' }),
Expand All @@ -27,10 +26,10 @@ export default class DevSync extends Command {
bucketId: flags.integer({
char: 'b',
required: true,
description: 'The buckerId to sync',
description: 'The buckerId to fetch',
}),
syncWorkersNumber: flags.integer({
char: 'p',
char: 'n',
required: false,
description: 'Sync workers number (max async operations in progress).',
default: 20,
Expand All @@ -44,27 +43,30 @@ export default class DevSync extends Command {
queryNodeEndpoint: flags.string({
char: 'q',
required: false,
default: 'http://localhost:8081/graphql',
description: 'Query node endpoint (e.g.: http://some.com:8081/graphql)',
default: 'https://query.joystream.org/graphql',
description: 'Query node endpoint (e.g.: https://query.joystream.org/graphql)',
}),
dataSourceOperatorUrl: flags.string({
char: 'o',
required: false,
description: 'Storage node url base (e.g.: http://some.com:3333) to get data from.',
default: 'http://localhost:3333',
}),
uploads: flags.string({
char: 'd',
required: true,
description: 'Data uploading directory (absolute path).',
}),
tempFolder: flags.string({
description:
'Directory to store tempory files during sync and upload (absolute path).\n,Temporary directory (absolute path). If not specified a subfolder under the uploads directory will be used.',
}),
}

async run(): Promise<void> {
const { flags } = this.parse(DevSync)
const { flags } = this.parse(FetchBucket)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Syncing...')
logger.info('Fetching bucket...')

try {
await performSync(
Expand All @@ -74,6 +76,7 @@ export default class DevSync extends Command {
flags.syncWorkersTimeout,
qnApi,
flags.uploads,
flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'),
'',
flags.dataSourceOperatorUrl
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import { print } from '../../services/helpers/stdout'
* format.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:multihash"
* Shell command: "util:multihash"
*/
export default class DevMultihash extends Command {
export default class Multihash extends Command {
static description = 'Creates a multihash (blake3) for a file.'

static flags = {
Expand All @@ -25,7 +24,7 @@ export default class DevMultihash extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevMultihash)
const { flags } = this.parse(Multihash)

logger.info(`Hashing ${flags.file} ....`)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import { customFlags } from '../../command-base/CustomFlags'
* Verifies supported bag ID types in the string format.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:verify-bag-id"
*/
export default class DevVerifyBagId extends Command {
export default class VerifyBagId extends Command {
static description = 'The command verifies bag id supported by the storage node. Requires chain connection.'

static flags = {
Expand All @@ -21,7 +20,7 @@ export default class DevVerifyBagId extends Command {
}

async run(): Promise<void> {
const { flags } = this.parse(DevVerifyBagId)
const { flags } = this.parse(VerifyBagId)

logger.info(`Parsed: ${flags.bagId}`)
}
Expand Down
Loading

0 comments on commit c4789f9

Please sign in to comment.