Skip to content

Commit

Permalink
commonalize datastore logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Tsuyoshi-Ishikawa committed Aug 30, 2023
1 parent f6d10f5 commit cee16b7
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 294 deletions.
207 changes: 170 additions & 37 deletions src/datastore/services/datastore.service.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { RateLimit } from 'src/config/transactionAllowList';
import { RequestContext } from 'src/datastore/entities';
import { RdbService } from './rdb.service';
import { RedisService } from './redis.service';
import {
HeartbeatMillisecondInterval,
workerCountMillisecondInterval,
} from 'src/constants';
import { CacheService } from './cache.service';
import { TransactionLimitStockService } from './transactionLimitStock.service';
import { blockNumberCacheExpireSecLimit } from 'src/datastore/consts';

@Injectable()
export class DatastoreService {
private datastore: string;
private blockNumberCacheExpireSec: number;
private intervalTimesToCheckWorkerCount: number;

constructor(
private configService: ConfigService,
private redisService: RedisService,
private rdbService: RdbService,
private cacheService: CacheService,
private txLimitStockService: TransactionLimitStockService,
) {
const blockNumberCacheExpireSec =
this.configService.get<number>('blockNumberCacheExpireSec') || 0;
this.intervalTimesToCheckWorkerCount = 3;
const redisUri = process.env.REDIS_URI;
const rdbUri = process.env.RDB_URI;

Expand All @@ -20,21 +36,23 @@ export class DatastoreService {
} else if (rdbUri) {
this.datastore = 'rdb';
}

if (blockNumberCacheExpireSec > blockNumberCacheExpireSecLimit) {
console.warn(
`block_number_cache_expire limit is ${blockNumberCacheExpireSecLimit}. block_number_cache_expire is set to ${blockNumberCacheExpireSecLimit}`,
);
this.blockNumberCacheExpireSec = blockNumberCacheExpireSecLimit;
} else {
this.blockNumberCacheExpireSec = blockNumberCacheExpireSec;
}
}

async getWorkerCount() {
let workerCount = 0;

try {
switch (this.datastore) {
case 'redis':
workerCount = await this.redisService.getWorkerCount();
break;
case 'rdb':
workerCount = await this.rdbService.getWorkerCount();
break;
}
return workerCount;
const workerCount = await this.cacheService.getWorkerCount();
if (workerCount) return workerCount;

return await this.setWorkerCount();
} catch (err) {
if (err instanceof Error) {
console.error(err.message);
Expand All @@ -54,24 +72,73 @@ export class DatastoreService {
let count = 0;

try {
const key = this.txLimitStockService.getTxLimitStockKey(
from,
to,
methodId,
rateLimit,
);
let txLimitStock = this.txLimitStockService.getTxLimitStock(key);
const getStandardTxLimitStockAmount = async (limit: number) => {
return await this.getStandardTxLimitStockAmount(limit);
};
switch (this.datastore) {
case 'redis':
count = await this.redisService.getAllowedTxCount(
from,
to,
methodId,
rateLimit,
);
if (
this.txLimitStockService.isNeedTxLimitStockUpdate(
txLimitStock,
rateLimit,
)
) {
await this.redisService.resetTxLimitStock(
key,
rateLimit,
getStandardTxLimitStockAmount,
);
} else if (
this.txLimitStockService.isSurplusStock(
txLimitStock,
rateLimit,
await this.getStandardTxLimitStockAmount(rateLimit.limit),
)
) {
await this.redisService.returnSurplusTxLimitStock(
key,
rateLimit,
getStandardTxLimitStockAmount,
);
}
break;
case 'rdb':
count = await this.rdbService.getAllowedTxCount(
from,
to,
methodId,
rateLimit,
);
if (
this.txLimitStockService.isNeedTxLimitStockUpdate(
txLimitStock,
rateLimit,
)
) {
await this.rdbService.resetTxLimitStock(
key,
rateLimit,
getStandardTxLimitStockAmount,
);
} else if (
this.txLimitStockService.isSurplusStock(
txLimitStock,
rateLimit,
await this.getStandardTxLimitStockAmount(rateLimit.limit),
)
) {
await this.rdbService.returnSurplusTxLimitStock(
key,
rateLimit,
getStandardTxLimitStockAmount,
);
}
break;
}
this.txLimitStockService.consumeStock(key);
txLimitStock = this.txLimitStockService.getTxLimitStock(key);
count = txLimitStock ? txLimitStock.stock : 0;
return count;
} catch (err) {
if (err instanceof Error) {
Expand All @@ -84,22 +151,36 @@ export class DatastoreService {
}

async getBlockNumber(requestContext: RequestContext) {
let blockNumberCache = '';
let blockNumber = '';
const key = this.cacheService.getBlockNumberCacheKey(requestContext);
const cache = await this.cacheService.getBlockNumber(key);
if (cache) return cache;

try {
switch (this.datastore) {
case 'redis':
blockNumberCache = await this.redisService.getBlockNumber(
requestContext,
);
blockNumber = (await this.redisService.getBlockNumber(key)) ?? '';
break;
case 'rdb':
blockNumberCache = await this.rdbService.getBlockNumber(
requestContext,
);
const blockNumberData = await this.rdbService.getBlockNumber(key);
if (
!blockNumberData ||
Date.now() >=
blockNumberData.updated_at + this.blockNumberCacheExpireSec * 1000
) {
blockNumber = '';
break;
}
blockNumber = blockNumberData.value;
break;
}
return blockNumberCache;
if (blockNumber)
await this.cacheService.setBlockNumber(
key,
blockNumber,
this.blockNumberCacheExpireSec * 1000,
);
return blockNumber;
} catch (err) {
if (err instanceof Error) {
console.error(err.message);
Expand All @@ -110,16 +191,35 @@ export class DatastoreService {
}
}

// Calculate the standard stock amount of transaction count inventory
// based on the number of workers in the standing proxy
async getStandardTxLimitStockAmount(limit: number) {
const workerCount = await this.getWorkerCount();
const txCountStockStandardAmount = Math.floor(limit / (5 * workerCount));
if (txCountStockStandardAmount < 1) return 1;
return txCountStockStandardAmount;
}

async setBlockNumber(requestContext: RequestContext, blockNumber: string) {
try {
const key = this.cacheService.getBlockNumberCacheKey(requestContext);
switch (this.datastore) {
case 'redis':
await this.redisService.setBlockNumber(requestContext, blockNumber);
await this.redisService.setBlockNumber(
key,
blockNumber,
this.blockNumberCacheExpireSec,
);
break;
case 'rdb':
await this.rdbService.setBlockNumber(requestContext, blockNumber);
await this.rdbService.setBlockNumber(key, blockNumber);
break;
}
await this.cacheService.setBlockNumber(
key,
blockNumber,
this.blockNumberCacheExpireSec * 1000,
);
} catch (err) {
if (err instanceof Error) {
console.error(err.message);
Expand All @@ -132,12 +232,25 @@ export class DatastoreService {

async setHeartBeat() {
try {
const refreshMultiple = 10;
const now = Date.now();
const refreshTimestamp =
now -
this.intervalTimesToCheckWorkerCount * HeartbeatMillisecondInterval;
switch (this.datastore) {
case 'redis':
await this.redisService.setHeartBeat();
await this.redisService.setHeartBeat(
refreshMultiple,
now,
refreshTimestamp,
);
break;
case 'rdb':
await this.rdbService.setHeartBeat();
await this.rdbService.setHeartBeat(
refreshMultiple,
now,
refreshTimestamp,
);
break;
}
} catch (err) {
Expand All @@ -151,15 +264,35 @@ export class DatastoreService {
}

async setWorkerCount() {
let workerCountAverage = 0;
try {
const now = Date.now();
const intervalAgo =
now -
this.intervalTimesToCheckWorkerCount * HeartbeatMillisecondInterval;
// counts the number of heartbeats during `intervalTimesToCheckWorkerCount` Heartbeat cronjob runs and calculates the average number of heartbeats in one interval
switch (this.datastore) {
case 'redis':
await this.redisService.setWorkerCountToCache();
workerCountAverage = Math.floor(
(await this.redisService.getWorkerCountInInterval(
intervalAgo,
now,
)) / this.intervalTimesToCheckWorkerCount,
);
break;
case 'rdb':
await this.rdbService.setWorkerCountToCache();
workerCountAverage = Math.floor(
(await this.rdbService.getWorkerCountInInterval(intervalAgo, now)) /
this.intervalTimesToCheckWorkerCount,
);
break;
}
const workerCount = Math.max(workerCountAverage, 1);
await this.cacheService.setWorkerCount(
workerCount,
workerCountMillisecondInterval,
);
return workerCount;
} catch (err) {
if (err instanceof Error) {
console.error(err.message);
Expand Down
Loading

0 comments on commit cee16b7

Please sign in to comment.