From 0f36cd96c7c23560587cfd5bbf996a54b0efec30 Mon Sep 17 00:00:00 2001 From: Caesar Mukama Date: Wed, 11 Feb 2026 04:56:13 +0300 Subject: [PATCH 1/5] feat: add GET /auth/pool-stats/aggregate endpoint Add pool stats aggregate API endpoint that fetches pool stats history and transactions, processes daily stats and revenue, and aggregates by period (daily/weekly/monthly) with optional pool filtering. --- tests/unit/handlers/pools.handlers.test.js | 176 ++++++++++++++++ tests/unit/routes/pools.routes.test.js | 39 ++++ workers/lib/constants.js | 44 +++- workers/lib/period.utils.js | 192 ++++++++++++++++++ workers/lib/server/handlers/pools.handlers.js | 171 ++++++++++++++++ workers/lib/server/index.js | 4 +- workers/lib/server/routes/pools.routes.js | 36 ++++ workers/lib/server/schemas/pools.schemas.js | 19 ++ workers/lib/utils.js | 13 +- 9 files changed, 690 insertions(+), 4 deletions(-) create mode 100644 tests/unit/handlers/pools.handlers.test.js create mode 100644 tests/unit/routes/pools.routes.test.js create mode 100644 workers/lib/period.utils.js create mode 100644 workers/lib/server/handlers/pools.handlers.js create mode 100644 workers/lib/server/routes/pools.routes.js create mode 100644 workers/lib/server/schemas/pools.schemas.js diff --git a/tests/unit/handlers/pools.handlers.test.js b/tests/unit/handlers/pools.handlers.test.js new file mode 100644 index 0000000..23f47bb --- /dev/null +++ b/tests/unit/handlers/pools.handlers.test.js @@ -0,0 +1,176 @@ +'use strict' + +const test = require('brittle') +const { + getPoolStatsAggregate, + processStatsData, + processRevenueData, + calculateAggregateSummary +} = require('../../../workers/lib/server/handlers/pools.handlers') + +test('getPoolStatsAggregate - happy path', async (t) => { + const mockCtx = { + conf: { + orks: [{ rpcPublicKey: 'key1' }] + }, + net_r0: { + jRequest: async (key, method, payload) => { + if (method === 'tailLog') { + return [{ data: [{ ts: 1700006400000, balance: 50000, hashrate: 100, worker_count: 5, tag: 'pool1' }] }] + } + if (method === 'getWrkExtData') { + return { data: [{ transactions: [{ ts: 1700006400000, changed_balance: 50000000, tag: 'pool1' }] }] } + } + return {} + } + } + } + + const mockReq = { + query: { start: 1700000000000, end: 1700100000000, range: 'daily' } + } + + const result = await getPoolStatsAggregate(mockCtx, mockReq, {}) + t.ok(result.log, 'should return log array') + t.ok(result.summary, 'should return summary') + t.ok(Array.isArray(result.log), 'log should be array') + t.pass() +}) + +test('getPoolStatsAggregate - with pool filter', async (t) => { + const mockCtx = { + conf: { orks: [{ rpcPublicKey: 'key1' }] }, + net_r0: { + jRequest: async (key, method) => { + if (method === 'tailLog') { + return [{ + data: [ + { ts: 1700006400000, balance: 50000, hashrate: 100, tag: 'pool1' }, + { ts: 1700006400000, balance: 30000, hashrate: 200, tag: 'pool2' } + ] + }] + } + return { data: [] } + } + } + } + + const mockReq = { + query: { start: 1700000000000, end: 1700100000000, pool: 'pool1' } + } + + const result = await getPoolStatsAggregate(mockCtx, mockReq, {}) + t.ok(result.log, 'should return filtered log') + t.pass() +}) + +test('getPoolStatsAggregate - missing start throws', async (t) => { + const mockCtx = { + conf: { orks: [] }, + net_r0: { jRequest: async () => ({}) } + } + + try { + await getPoolStatsAggregate(mockCtx, { query: { end: 1700100000000 } }, {}) + t.fail('should have thrown') + } catch (err) { + t.is(err.message, 'ERR_MISSING_START_END', 'should throw missing start/end error') + } + t.pass() +}) + +test('getPoolStatsAggregate - invalid range throws', async (t) => { + const mockCtx = { + conf: { orks: [] }, + net_r0: { jRequest: async () => ({}) } + } + + try { + await getPoolStatsAggregate(mockCtx, { query: { start: 1700100000000, end: 1700000000000 } }, {}) + t.fail('should have thrown') + } catch (err) { + t.is(err.message, 'ERR_INVALID_DATE_RANGE', 'should throw invalid range error') + } + t.pass() +}) + +test('getPoolStatsAggregate - empty ork results', async (t) => { + const mockCtx = { + conf: { orks: [{ rpcPublicKey: 'key1' }] }, + net_r0: { jRequest: async () => ({}) } + } + + const result = await getPoolStatsAggregate(mockCtx, { query: { start: 1700000000000, end: 1700100000000 } }, {}) + t.ok(result.log, 'should return log array') + t.is(result.log.length, 0, 'log should be empty') + t.pass() +}) + +test('processStatsData - processes valid stats', (t) => { + const results = [ + [{ data: [{ ts: 1700006400000, balance: 50000, hashrate: 100, worker_count: 5 }] }] + ] + const daily = processStatsData(results, null) + t.ok(typeof daily === 'object', 'should return object') + t.pass() +}) + +test('processStatsData - handles error results', (t) => { + const results = [{ error: 'timeout' }] + const daily = processStatsData(results, null) + t.is(Object.keys(daily).length, 0, 'should be empty') + t.pass() +}) + +test('processStatsData - filters by pool', (t) => { + const results = [ + [{ + data: [ + { ts: 1700006400000, balance: 50000, tag: 'pool1' }, + { ts: 1700006400000, balance: 30000, tag: 'pool2' } + ] + }] + ] + const daily = processStatsData(results, 'pool1') + t.ok(typeof daily === 'object', 'should return filtered data') + t.pass() +}) + +test('processRevenueData - processes valid transactions', (t) => { + const results = [ + [{ transactions: [{ ts: 1700006400000, changed_balance: 100000000 }] }] + ] + const daily = processRevenueData(results, null) + t.ok(typeof daily === 'object', 'should return object') + t.pass() +}) + +test('processRevenueData - handles error results', (t) => { + const results = [{ error: 'timeout' }] + const daily = processRevenueData(results, null) + t.is(Object.keys(daily).length, 0, 'should be empty') + t.pass() +}) + +test('calculateAggregateSummary - calculates from log', (t) => { + const log = [ + { revenueBTC: 0.5, hashrate: 100, workerCount: 5, balance: 50000 }, + { revenueBTC: 0.3, hashrate: 200, workerCount: 10, balance: 60000 } + ] + + const summary = calculateAggregateSummary(log) + t.is(summary.totalRevenueBTC, 0.8, 'should sum revenue') + t.is(summary.avgHashrate, 150, 'should avg hashrate') + t.is(summary.avgWorkerCount, 7.5, 'should avg workers') + t.is(summary.latestBalance, 60000, 'should take latest balance') + t.is(summary.periodCount, 2, 'should count periods') + t.pass() +}) + +test('calculateAggregateSummary - handles empty log', (t) => { + const summary = calculateAggregateSummary([]) + t.is(summary.totalRevenueBTC, 0, 'should be zero') + t.is(summary.avgHashrate, 0, 'should be zero') + t.is(summary.periodCount, 0, 'should be zero') + t.pass() +}) diff --git a/tests/unit/routes/pools.routes.test.js b/tests/unit/routes/pools.routes.test.js new file mode 100644 index 0000000..5750248 --- /dev/null +++ b/tests/unit/routes/pools.routes.test.js @@ -0,0 +1,39 @@ +'use strict' + +const test = require('brittle') +const { testModuleStructure, testHandlerFunctions, testOnRequestFunctions } = require('../helpers/routeTestHelpers') +const { createRoutesForTest } = require('../helpers/mockHelpers') + +const ROUTES_PATH = '../../../workers/lib/server/routes/pools.routes.js' + +test('pools routes - module structure', (t) => { + testModuleStructure(t, ROUTES_PATH, 'pools') + t.pass() +}) + +test('pools routes - route definitions', (t) => { + const routes = createRoutesForTest(ROUTES_PATH) + const routeUrls = routes.map(route => route.url) + t.ok(routeUrls.includes('/auth/pool-stats/aggregate'), 'should have pool-stats aggregate route') + t.pass() +}) + +test('pools routes - HTTP methods', (t) => { + const routes = createRoutesForTest(ROUTES_PATH) + routes.forEach(route => { + t.is(route.method, 'GET', `route ${route.url} should be GET`) + }) + t.pass() +}) + +test('pools routes - handler functions', (t) => { + const routes = createRoutesForTest(ROUTES_PATH) + testHandlerFunctions(t, routes, 'pools') + t.pass() +}) + +test('pools routes - onRequest functions', (t) => { + const routes = createRoutesForTest(ROUTES_PATH) + testOnRequestFunctions(t, routes, 'pools') + t.pass() +}) diff --git a/workers/lib/constants.js b/workers/lib/constants.js index f1f407e..230303c 100644 --- a/workers/lib/constants.js +++ b/workers/lib/constants.js @@ -108,7 +108,10 @@ const ENDPOINTS = { THING_CONFIG: '/auth/thing-config', // WebSocket endpoint - WEBSOCKET: '/ws' + WEBSOCKET: '/ws', + + // Pool stats endpoints + POOL_STATS_AGGREGATE: '/auth/pool-stats/aggregate' } const HTTP_METHODS = { @@ -183,6 +186,37 @@ const STATUS_CODES = { INTERNAL_SERVER_ERROR: 500 } +const RPC_METHODS = { + TAIL_LOG: 'tailLog', + GET_WRK_EXT_DATA: 'getWrkExtData' +} + +const WORKER_TYPES = { + MINERPOOL: 'minerpool' +} + +const PERIOD_TYPES = { + DAILY: 'daily', + WEEKLY: 'weekly', + MONTHLY: 'monthly' +} + +const MINERPOOL_EXT_DATA_KEYS = { + TRANSACTIONS: 'transactions', + STATS: 'stats' +} + +const BTC_SATS = 100000000 + +const NON_METRIC_KEYS = [ + 'ts', + 'site', + 'year', + 'monthName', + 'month', + 'period' +] + const RPC_TIMEOUT = 15000 const RPC_CONCURRENCY_LIMIT = 2 @@ -202,5 +236,11 @@ module.exports = { STATUS_CODES, RPC_TIMEOUT, RPC_CONCURRENCY_LIMIT, - USER_SETTINGS_TYPE + USER_SETTINGS_TYPE, + RPC_METHODS, + WORKER_TYPES, + PERIOD_TYPES, + MINERPOOL_EXT_DATA_KEYS, + BTC_SATS, + NON_METRIC_KEYS } diff --git a/workers/lib/period.utils.js b/workers/lib/period.utils.js new file mode 100644 index 0000000..5f206a4 --- /dev/null +++ b/workers/lib/period.utils.js @@ -0,0 +1,192 @@ +'use strict' + +const { PERIOD_TYPES, NON_METRIC_KEYS } = require('./constants') + +const getStartOfDay = (ts) => Math.floor(ts / 86400000) * 86400000 + +const convertMsToSeconds = (timestampMs) => { + return Math.floor(timestampMs / 1000) +} + +const PERIOD_CALCULATORS = { + daily: (timestamp) => getStartOfDay(timestamp), + weekly: (timestamp) => { + const date = new Date(timestamp) + const day = date.getUTCDay() + const diff = date.getUTCDate() - day + return new Date(Date.UTC(date.getUTCFullYear(), date.getUTCMonth(), diff)).getTime() + }, + monthly: (timestamp) => { + const date = new Date(timestamp) + return new Date(date.getFullYear(), date.getMonth(), 1).getTime() + }, + yearly: (timestamp) => { + const date = new Date(timestamp) + return new Date(date.getFullYear(), 0, 1).getTime() + } +} + +const aggregateByPeriod = (log, period, nonMetricKeys = []) => { + if (period === PERIOD_TYPES.DAILY) { + return log + } + + const allNonMetricKeys = new Set([...NON_METRIC_KEYS, ...nonMetricKeys]) + + const grouped = log.reduce((acc, entry) => { + let date + try { + date = new Date(Number(entry.ts)) + + if (isNaN(date.getTime())) { + return acc + } + } catch (error) { + return acc + } + + let groupKey + + if (period === PERIOD_TYPES.MONTHLY) { + groupKey = `${date.getFullYear()}-${String(date.getMonth() + 1).padStart(2, '0')}` + } else if (period === PERIOD_TYPES.YEARLY) { + groupKey = `${date.getFullYear()}` + } else if (period === PERIOD_TYPES.WEEKLY) { + const day = date.getUTCDay() + const diff = date.getUTCDate() - day + const weekStart = new Date(Date.UTC(date.getUTCFullYear(), date.getUTCMonth(), diff)) + groupKey = `${weekStart.getTime()}` + } else { + groupKey = `${entry.ts}` + } + + if (!acc[groupKey]) { + acc[groupKey] = [] + } + acc[groupKey].push(entry) + return acc + }, {}) + + const aggregatedResults = Object.entries(grouped).map(([groupKey, entries]) => { + const aggregated = entries.reduce((acc, entry) => { + Object.entries(entry).forEach(([key, val]) => { + if (allNonMetricKeys.has(key)) { + if (!acc[key] || acc[key] === null || acc[key] === undefined) { + acc[key] = val + } + } else { + const numVal = Number(val) || 0 + acc[key] = (acc[key] || 0) + numVal + } + }) + return acc + }, {}) + + try { + if (period === PERIOD_TYPES.MONTHLY) { + const [year, month] = groupKey.split('-').map(Number) + + const newDate = new Date(year, month - 1, 1) + if (isNaN(newDate.getTime())) { + throw new Error(`Invalid date for monthly grouping: ${groupKey}`) + } + + aggregated.ts = newDate.getTime() + aggregated.month = month + aggregated.year = year + aggregated.monthName = newDate.toLocaleString('en-US', { month: 'long' }) + } else if (period === PERIOD_TYPES.YEARLY) { + const year = parseInt(groupKey) + + const newDate = new Date(year, 0, 1) + if (isNaN(newDate.getTime())) { + throw new Error(`Invalid date for yearly grouping: ${groupKey}`) + } + + aggregated.ts = newDate.getTime() + aggregated.year = year + } else if (period === PERIOD_TYPES.WEEKLY) { + aggregated.ts = Number(groupKey) + } + } catch (error) { + aggregated.ts = entries[0].ts + + try { + const fallbackDate = new Date(Number(entries[0].ts)) + if (!isNaN(fallbackDate.getTime())) { + if (period === PERIOD_TYPES.MONTHLY) { + aggregated.month = fallbackDate.getMonth() + 1 + aggregated.year = fallbackDate.getFullYear() + aggregated.monthName = fallbackDate.toLocaleString('en-US', { month: 'long' }) + } else if (period === PERIOD_TYPES.YEARLY) { + aggregated.year = fallbackDate.getFullYear() + } + } + } catch (fallbackError) { + console.warn('Could not extract date info from fallback timestamp', fallbackError) + } + } + + return aggregated + }) + + return aggregatedResults.sort((a, b) => Number(b.ts) - Number(a.ts)) +} + +const getPeriodKey = (timestamp, period) => { + const calculator = PERIOD_CALCULATORS[period] || PERIOD_CALCULATORS.daily + return calculator(timestamp) +} + +const getPeriodEndDate = (periodTs, period) => { + const periodEnd = new Date(periodTs) + + switch (period) { + case PERIOD_TYPES.WEEKLY: + periodEnd.setDate(periodEnd.getDate() + 7) + break + case PERIOD_TYPES.MONTHLY: + periodEnd.setMonth(periodEnd.getMonth() + 1) + break + case PERIOD_TYPES.YEARLY: + periodEnd.setFullYear(periodEnd.getFullYear() + 1) + break + } + + return periodEnd +} + +const isTimestampInPeriod = (timestamp, periodTs, period) => { + if (period === PERIOD_TYPES.DAILY) return timestamp === periodTs + + const periodEnd = getPeriodEndDate(periodTs, period) + return timestamp >= periodTs && timestamp < periodEnd.getTime() +} + +const getFilteredPeriodData = ( + sourceData, + periodTs, + period, + filterFn = (entries) => entries +) => { + if (period === PERIOD_TYPES.DAILY) { + return sourceData[periodTs] || (typeof filterFn === 'function' ? {} : 0) + } + + const entriesInPeriod = Object.entries(sourceData).filter(([tsStr]) => { + const timestamp = Number(tsStr) + return isTimestampInPeriod(timestamp, periodTs, period) + }) + + return filterFn(entriesInPeriod, sourceData) +} + +module.exports = { + getStartOfDay, + convertMsToSeconds, + getPeriodEndDate, + aggregateByPeriod, + getPeriodKey, + isTimestampInPeriod, + getFilteredPeriodData +} diff --git a/workers/lib/server/handlers/pools.handlers.js b/workers/lib/server/handlers/pools.handlers.js new file mode 100644 index 0000000..71e7f1f --- /dev/null +++ b/workers/lib/server/handlers/pools.handlers.js @@ -0,0 +1,171 @@ +'use strict' + +const { + WORKER_TYPES, + PERIOD_TYPES, + MINERPOOL_EXT_DATA_KEYS, + RPC_METHODS, + BTC_SATS +} = require('../../constants') +const { + requestRpcEachLimit, + getStartOfDay, + runParallel +} = require('../../utils') +const { aggregateByPeriod } = require('../../period.utils') + +async function getPoolStatsAggregate (ctx, req) { + const start = Number(req.query.start) + const end = Number(req.query.end) + const range = req.query.range || PERIOD_TYPES.DAILY + const poolFilter = req.query.pool || null + + if (!start || !end) { + throw new Error('ERR_MISSING_START_END') + } + + if (start >= end) { + throw new Error('ERR_INVALID_DATE_RANGE') + } + + const [statsResults, transactionResults] = await runParallel([ + (cb) => requestRpcEachLimit(ctx, RPC_METHODS.TAIL_LOG, { + key: 'stat-3h', + type: WORKER_TYPES.MINERPOOL, + start, + end + }).then(r => cb(null, r)).catch(cb), + + (cb) => requestRpcEachLimit(ctx, RPC_METHODS.GET_WRK_EXT_DATA, { + type: WORKER_TYPES.MINERPOOL, + query: { key: MINERPOOL_EXT_DATA_KEYS.TRANSACTIONS, start, end } + }).then(r => cb(null, r)).catch(cb) + ]) + + const dailyStats = processStatsData(statsResults, poolFilter) + const dailyRevenue = processRevenueData(transactionResults, poolFilter) + + const allDays = new Set([ + ...Object.keys(dailyStats), + ...Object.keys(dailyRevenue) + ]) + + const log = [] + for (const dayTs of [...allDays].sort()) { + const ts = Number(dayTs) + const stats = dailyStats[dayTs] || {} + const revenue = dailyRevenue[dayTs] || {} + + log.push({ + ts, + balance: stats.balance || 0, + hashrate: stats.hashrate || 0, + workerCount: stats.workerCount || 0, + revenueBTC: revenue.revenueBTC || 0, + snapshotCount: stats.count || 0 + }) + } + + const aggregated = aggregateByPeriod(log, range) + const summary = calculateAggregateSummary(aggregated) + + return { log: aggregated, summary } +} + +function processStatsData (results, poolFilter) { + const daily = {} + for (const res of results) { + if (res.error || !res) continue + const data = Array.isArray(res) ? res : (res.data || res.result || []) + if (!Array.isArray(data)) continue + for (const entry of data) { + if (!entry) continue + const items = entry.data || entry.items || entry + if (Array.isArray(items)) { + for (const item of items) { + if (poolFilter && item.tag !== poolFilter && item.pool !== poolFilter && item.id !== poolFilter) continue + const ts = getStartOfDay(item.ts || item.timestamp) + if (!ts) continue + if (!daily[ts]) daily[ts] = { balance: 0, hashrate: 0, workerCount: 0, count: 0 } + daily[ts].balance = item.balance || daily[ts].balance + daily[ts].hashrate += (item.hashrate || 0) + daily[ts].workerCount += (item.worker_count || item.workerCount || 0) + daily[ts].count += 1 + } + } else if (typeof items === 'object') { + for (const [key, val] of Object.entries(items)) { + if (!val || typeof val !== 'object') continue + if (poolFilter && val.tag !== poolFilter && val.pool !== poolFilter && key !== poolFilter) continue + const ts = getStartOfDay(val.ts || val.timestamp || Number(key)) + if (!ts) continue + if (!daily[ts]) daily[ts] = { balance: 0, hashrate: 0, workerCount: 0, count: 0 } + daily[ts].balance = val.balance || daily[ts].balance + daily[ts].hashrate += (val.hashrate || 0) + daily[ts].workerCount += (val.worker_count || val.workerCount || 0) + daily[ts].count += 1 + } + } + } + } + return daily +} + +function processRevenueData (results, poolFilter) { + const daily = {} + for (const res of results) { + if (res.error || !res) continue + const data = Array.isArray(res) ? res : (res.data || res.result || []) + if (!Array.isArray(data)) continue + for (const tx of data) { + if (!tx) continue + const txList = tx.data || tx.transactions || tx + if (!Array.isArray(txList)) continue + for (const t of txList) { + if (!t) continue + if (poolFilter && t.tag !== poolFilter && t.pool !== poolFilter && t.id !== poolFilter) continue + const ts = getStartOfDay(t.ts || t.timestamp || t.time) + if (!ts) continue + if (!daily[ts]) daily[ts] = { revenueBTC: 0 } + const amount = t.changed_balance || t.amount || t.value || 0 + daily[ts].revenueBTC += Math.abs(amount) / BTC_SATS + } + } + } + return daily +} + +function calculateAggregateSummary (log) { + if (!log.length) { + return { + totalRevenueBTC: 0, + avgHashrate: 0, + avgWorkerCount: 0, + latestBalance: 0, + periodCount: 0 + } + } + + const totals = log.reduce((acc, entry) => { + acc.revenueBTC += entry.revenueBTC || 0 + acc.hashrate += entry.hashrate || 0 + acc.workerCount += entry.workerCount || 0 + return acc + }, { revenueBTC: 0, hashrate: 0, workerCount: 0 }) + + const latest = log[log.length - 1] + + return { + totalRevenueBTC: totals.revenueBTC, + avgHashrate: totals.hashrate / log.length, + avgWorkerCount: totals.workerCount / log.length, + latestBalance: latest.balance || 0, + periodCount: log.length + } +} + +module.exports = { + getPoolStatsAggregate, + processStatsData, + processRevenueData, + calculateAggregateSummary +} diff --git a/workers/lib/server/index.js b/workers/lib/server/index.js index ac884da..9ce3cab 100644 --- a/workers/lib/server/index.js +++ b/workers/lib/server/index.js @@ -8,6 +8,7 @@ const globalRoutes = require('./routes/global.routes') const thingsRoutes = require('./routes/things.routes') const settingsRoutes = require('./routes/settings.routes') const wsRoutes = require('./routes/ws.routes') +const poolsRoutes = require('./routes/pools.routes') /** * Collect all routes into a flat array for server injection. @@ -22,7 +23,8 @@ function routes (ctx) { ...thingsRoutes(ctx), ...usersRoutes(ctx), ...settingsRoutes(ctx), - ...wsRoutes(ctx) + ...wsRoutes(ctx), + ...poolsRoutes(ctx) ] } diff --git a/workers/lib/server/routes/pools.routes.js b/workers/lib/server/routes/pools.routes.js new file mode 100644 index 0000000..e637c95 --- /dev/null +++ b/workers/lib/server/routes/pools.routes.js @@ -0,0 +1,36 @@ +'use strict' + +const { + ENDPOINTS, + HTTP_METHODS +} = require('../../constants') +const { + getPoolStatsAggregate +} = require('../handlers/pools.handlers') +const { createCachedAuthRoute } = require('../lib/routeHelpers') + +module.exports = (ctx) => { + const schemas = require('../schemas/pools.schemas.js') + + return [ + { + method: HTTP_METHODS.GET, + url: ENDPOINTS.POOL_STATS_AGGREGATE, + schema: { + querystring: schemas.query.poolStatsAggregate + }, + ...createCachedAuthRoute( + ctx, + (req) => [ + 'pool-stats/aggregate', + req.query.start, + req.query.end, + req.query.range, + req.query.pool + ], + ENDPOINTS.POOL_STATS_AGGREGATE, + getPoolStatsAggregate + ) + } + ] +} diff --git a/workers/lib/server/schemas/pools.schemas.js b/workers/lib/server/schemas/pools.schemas.js new file mode 100644 index 0000000..33f394e --- /dev/null +++ b/workers/lib/server/schemas/pools.schemas.js @@ -0,0 +1,19 @@ +'use strict' + +const schemas = { + query: { + poolStatsAggregate: { + type: 'object', + properties: { + start: { type: 'integer' }, + end: { type: 'integer' }, + range: { type: 'string', enum: ['daily', 'weekly', 'monthly'] }, + pool: { type: 'string' }, + overwriteCache: { type: 'boolean' } + }, + required: ['start', 'end'] + } + } +} + +module.exports = schemas diff --git a/workers/lib/utils.js b/workers/lib/utils.js index 886ae5f..6a554c9 100644 --- a/workers/lib/utils.js +++ b/workers/lib/utils.js @@ -2,6 +2,7 @@ const async = require('async') const { RPC_TIMEOUT, RPC_CONCURRENCY_LIMIT } = require('./constants') +const { getStartOfDay } = require('./period.utils') const dateNowSec = () => Math.floor(Date.now() / 1000) @@ -128,6 +129,14 @@ const requestRpcMapLimit = async (ctx, method, payload) => { }) } +const runParallel = (tasks) => + new Promise((resolve, reject) => { + async.parallel(tasks, (err, results) => { + if (err) reject(err) + else resolve(results) + }) + }) + module.exports = { dateNowSec, extractIps, @@ -137,5 +146,7 @@ module.exports = { getAuthTokenFromHeaders, parseJsonQueryParam, requestRpcEachLimit, - requestRpcMapLimit + requestRpcMapLimit, + getStartOfDay, + runParallel } From 6e3050d2a5f2945728409b94ee53f23c96cde2b0 Mon Sep 17 00:00:00 2001 From: Caesar Mukama Date: Thu, 12 Feb 2026 18:14:40 +0300 Subject: [PATCH 2/5] feat: remove snapshotCount from pool-stats-aggregate log entries Align pool stats aggregate endpoint with API v2 spec which does not include snapshotCount in the response. --- tests/unit/handlers/pools.handlers.test.js | 4 ++++ workers/lib/server/handlers/pools.handlers.js | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/unit/handlers/pools.handlers.test.js b/tests/unit/handlers/pools.handlers.test.js index 23f47bb..951beed 100644 --- a/tests/unit/handlers/pools.handlers.test.js +++ b/tests/unit/handlers/pools.handlers.test.js @@ -34,6 +34,10 @@ test('getPoolStatsAggregate - happy path', async (t) => { t.ok(result.log, 'should return log array') t.ok(result.summary, 'should return summary') t.ok(Array.isArray(result.log), 'log should be array') + if (result.log.length > 0) { + const entry = result.log[0] + t.ok(entry.snapshotCount === undefined, 'should not include snapshotCount') + } t.pass() }) diff --git a/workers/lib/server/handlers/pools.handlers.js b/workers/lib/server/handlers/pools.handlers.js index 71e7f1f..1ec0449 100644 --- a/workers/lib/server/handlers/pools.handlers.js +++ b/workers/lib/server/handlers/pools.handlers.js @@ -61,8 +61,7 @@ async function getPoolStatsAggregate (ctx, req) { balance: stats.balance || 0, hashrate: stats.hashrate || 0, workerCount: stats.workerCount || 0, - revenueBTC: revenue.revenueBTC || 0, - snapshotCount: stats.count || 0 + revenueBTC: revenue.revenueBTC || 0 }) } From 8581bef675a228debc1d6782a7a9be22194d551c Mon Sep 17 00:00:00 2001 From: Caesar Mukama Date: Thu, 12 Feb 2026 21:11:02 +0300 Subject: [PATCH 3/5] fix: use ext-data transactions instead of tail-log for pool stats aggregate tail-log returns empty for type=minerpool. Switch to ext-data with transactions key which has daily revenue and hashrate data. Also fix changed_balance handling (values are BTC, not satoshis). --- tests/unit/handlers/pools.handlers.test.js | 99 +++++++------ workers/lib/server/handlers/pools.handlers.js | 132 ++++++------------ 2 files changed, 89 insertions(+), 142 deletions(-) diff --git a/tests/unit/handlers/pools.handlers.test.js b/tests/unit/handlers/pools.handlers.test.js index 951beed..d5c30e4 100644 --- a/tests/unit/handlers/pools.handlers.test.js +++ b/tests/unit/handlers/pools.handlers.test.js @@ -3,8 +3,7 @@ const test = require('brittle') const { getPoolStatsAggregate, - processStatsData, - processRevenueData, + processTransactionData, calculateAggregateSummary } = require('../../../workers/lib/server/handlers/pools.handlers') @@ -14,14 +13,15 @@ test('getPoolStatsAggregate - happy path', async (t) => { orks: [{ rpcPublicKey: 'key1' }] }, net_r0: { - jRequest: async (key, method, payload) => { - if (method === 'tailLog') { - return [{ data: [{ ts: 1700006400000, balance: 50000, hashrate: 100, worker_count: 5, tag: 'pool1' }] }] - } - if (method === 'getWrkExtData') { - return { data: [{ transactions: [{ ts: 1700006400000, changed_balance: 50000000, tag: 'pool1' }] }] } - } - return {} + jRequest: async () => { + return [{ + ts: '1700006400000', + transactions: [{ + username: 'user1', + changed_balance: 0.001, + mining_extra: { hash_rate: 611000000000000 } + }] + }] } } } @@ -34,10 +34,8 @@ test('getPoolStatsAggregate - happy path', async (t) => { t.ok(result.log, 'should return log array') t.ok(result.summary, 'should return summary') t.ok(Array.isArray(result.log), 'log should be array') - if (result.log.length > 0) { - const entry = result.log[0] - t.ok(entry.snapshotCount === undefined, 'should not include snapshotCount') - } + t.ok(result.log.length > 0, 'should have entries') + t.ok(result.log[0].revenueBTC > 0, 'should have revenue') t.pass() }) @@ -45,26 +43,25 @@ test('getPoolStatsAggregate - with pool filter', async (t) => { const mockCtx = { conf: { orks: [{ rpcPublicKey: 'key1' }] }, net_r0: { - jRequest: async (key, method) => { - if (method === 'tailLog') { - return [{ - data: [ - { ts: 1700006400000, balance: 50000, hashrate: 100, tag: 'pool1' }, - { ts: 1700006400000, balance: 30000, hashrate: 200, tag: 'pool2' } - ] - }] - } - return { data: [] } + jRequest: async () => { + return [{ + ts: '1700006400000', + transactions: [ + { username: 'user1', changed_balance: 0.001 }, + { username: 'user2', changed_balance: 0.002 } + ] + }] } } } const mockReq = { - query: { start: 1700000000000, end: 1700100000000, pool: 'pool1' } + query: { start: 1700000000000, end: 1700100000000, pool: 'user1' } } const result = await getPoolStatsAggregate(mockCtx, mockReq, {}) t.ok(result.log, 'should return filtered log') + t.ok(result.log.length > 0, 'should have entries') t.pass() }) @@ -110,49 +107,47 @@ test('getPoolStatsAggregate - empty ork results', async (t) => { t.pass() }) -test('processStatsData - processes valid stats', (t) => { +test('processTransactionData - processes valid transactions', (t) => { const results = [ - [{ data: [{ ts: 1700006400000, balance: 50000, hashrate: 100, worker_count: 5 }] }] + [{ + ts: '1700006400000', + transactions: [ + { username: 'user1', changed_balance: 0.001, mining_extra: { hash_rate: 500000 } }, + { username: 'user2', changed_balance: 0.002, mining_extra: { hash_rate: 600000 } } + ] + }] ] - const daily = processStatsData(results, null) + const daily = processTransactionData(results, null) t.ok(typeof daily === 'object', 'should return object') + const keys = Object.keys(daily) + t.ok(keys.length > 0, 'should have entries') + const entry = daily[keys[0]] + t.ok(entry.revenueBTC > 0, 'should have revenue') + t.ok(entry.hashrate > 0, 'should have hashrate') t.pass() }) -test('processStatsData - handles error results', (t) => { +test('processTransactionData - handles error results', (t) => { const results = [{ error: 'timeout' }] - const daily = processStatsData(results, null) + const daily = processTransactionData(results, null) t.is(Object.keys(daily).length, 0, 'should be empty') t.pass() }) -test('processStatsData - filters by pool', (t) => { +test('processTransactionData - filters by pool', (t) => { const results = [ [{ - data: [ - { ts: 1700006400000, balance: 50000, tag: 'pool1' }, - { ts: 1700006400000, balance: 30000, tag: 'pool2' } + ts: '1700006400000', + transactions: [ + { username: 'user1', changed_balance: 0.001 }, + { username: 'user2', changed_balance: 0.002 } ] }] ] - const daily = processStatsData(results, 'pool1') - t.ok(typeof daily === 'object', 'should return filtered data') - t.pass() -}) - -test('processRevenueData - processes valid transactions', (t) => { - const results = [ - [{ transactions: [{ ts: 1700006400000, changed_balance: 100000000 }] }] - ] - const daily = processRevenueData(results, null) - t.ok(typeof daily === 'object', 'should return object') - t.pass() -}) - -test('processRevenueData - handles error results', (t) => { - const results = [{ error: 'timeout' }] - const daily = processRevenueData(results, null) - t.is(Object.keys(daily).length, 0, 'should be empty') + const daily = processTransactionData(results, 'user1') + const keys = Object.keys(daily) + t.ok(keys.length > 0, 'should have entries') + t.is(daily[keys[0]].revenueBTC, 0.001, 'should only include user1 revenue') t.pass() }) diff --git a/workers/lib/server/handlers/pools.handlers.js b/workers/lib/server/handlers/pools.handlers.js index 1ec0449..d608d5b 100644 --- a/workers/lib/server/handlers/pools.handlers.js +++ b/workers/lib/server/handlers/pools.handlers.js @@ -4,13 +4,11 @@ const { WORKER_TYPES, PERIOD_TYPES, MINERPOOL_EXT_DATA_KEYS, - RPC_METHODS, - BTC_SATS + RPC_METHODS } = require('../../constants') const { requestRpcEachLimit, - getStartOfDay, - runParallel + getStartOfDay } = require('../../utils') const { aggregateByPeriod } = require('../../period.utils') @@ -28,42 +26,22 @@ async function getPoolStatsAggregate (ctx, req) { throw new Error('ERR_INVALID_DATE_RANGE') } - const [statsResults, transactionResults] = await runParallel([ - (cb) => requestRpcEachLimit(ctx, RPC_METHODS.TAIL_LOG, { - key: 'stat-3h', - type: WORKER_TYPES.MINERPOOL, - start, - end - }).then(r => cb(null, r)).catch(cb), - - (cb) => requestRpcEachLimit(ctx, RPC_METHODS.GET_WRK_EXT_DATA, { - type: WORKER_TYPES.MINERPOOL, - query: { key: MINERPOOL_EXT_DATA_KEYS.TRANSACTIONS, start, end } - }).then(r => cb(null, r)).catch(cb) - ]) - - const dailyStats = processStatsData(statsResults, poolFilter) - const dailyRevenue = processRevenueData(transactionResults, poolFilter) - - const allDays = new Set([ - ...Object.keys(dailyStats), - ...Object.keys(dailyRevenue) - ]) - - const log = [] - for (const dayTs of [...allDays].sort()) { - const ts = Number(dayTs) - const stats = dailyStats[dayTs] || {} - const revenue = dailyRevenue[dayTs] || {} - - log.push({ - ts, - balance: stats.balance || 0, - hashrate: stats.hashrate || 0, - workerCount: stats.workerCount || 0, - revenueBTC: revenue.revenueBTC || 0 - }) - } + const transactionResults = await requestRpcEachLimit(ctx, RPC_METHODS.GET_WRK_EXT_DATA, { + type: WORKER_TYPES.MINERPOOL, + query: { key: MINERPOOL_EXT_DATA_KEYS.TRANSACTIONS, start, end } + }) + + const dailyData = processTransactionData(transactionResults, poolFilter) + + const log = Object.entries(dailyData) + .sort(([a], [b]) => Number(a) - Number(b)) + .map(([ts, data]) => ({ + ts: Number(ts), + balance: data.revenueBTC, + hashrate: data.hashCount > 0 ? data.hashrate / data.hashCount : 0, + workerCount: 0, + revenueBTC: data.revenueBTC + })) const aggregated = aggregateByPeriod(log, range) const summary = calculateAggregateSummary(aggregated) @@ -71,62 +49,37 @@ async function getPoolStatsAggregate (ctx, req) { return { log: aggregated, summary } } -function processStatsData (results, poolFilter) { +/** + * Processes ext-data transaction results into daily entries. + * The ext-data response for transactions with start/end: + * [ { ts, transactions: [{username, changed_balance, mining_extra: {hash_rate, ...}}, ...] }, ... ] + * changed_balance is already in BTC (not satoshis). + */ +function processTransactionData (results, poolFilter) { const daily = {} for (const res of results) { if (res.error || !res) continue const data = Array.isArray(res) ? res : (res.data || res.result || []) if (!Array.isArray(data)) continue + for (const entry of data) { if (!entry) continue - const items = entry.data || entry.items || entry - if (Array.isArray(items)) { - for (const item of items) { - if (poolFilter && item.tag !== poolFilter && item.pool !== poolFilter && item.id !== poolFilter) continue - const ts = getStartOfDay(item.ts || item.timestamp) - if (!ts) continue - if (!daily[ts]) daily[ts] = { balance: 0, hashrate: 0, workerCount: 0, count: 0 } - daily[ts].balance = item.balance || daily[ts].balance - daily[ts].hashrate += (item.hashrate || 0) - daily[ts].workerCount += (item.worker_count || item.workerCount || 0) - daily[ts].count += 1 - } - } else if (typeof items === 'object') { - for (const [key, val] of Object.entries(items)) { - if (!val || typeof val !== 'object') continue - if (poolFilter && val.tag !== poolFilter && val.pool !== poolFilter && key !== poolFilter) continue - const ts = getStartOfDay(val.ts || val.timestamp || Number(key)) - if (!ts) continue - if (!daily[ts]) daily[ts] = { balance: 0, hashrate: 0, workerCount: 0, count: 0 } - daily[ts].balance = val.balance || daily[ts].balance - daily[ts].hashrate += (val.hashrate || 0) - daily[ts].workerCount += (val.worker_count || val.workerCount || 0) - daily[ts].count += 1 - } - } - } - } - return daily -} + const ts = getStartOfDay(Number(entry.ts)) + if (!ts) continue -function processRevenueData (results, poolFilter) { - const daily = {} - for (const res of results) { - if (res.error || !res) continue - const data = Array.isArray(res) ? res : (res.data || res.result || []) - if (!Array.isArray(data)) continue - for (const tx of data) { - if (!tx) continue - const txList = tx.data || tx.transactions || tx - if (!Array.isArray(txList)) continue - for (const t of txList) { - if (!t) continue - if (poolFilter && t.tag !== poolFilter && t.pool !== poolFilter && t.id !== poolFilter) continue - const ts = getStartOfDay(t.ts || t.timestamp || t.time) - if (!ts) continue - if (!daily[ts]) daily[ts] = { revenueBTC: 0 } - const amount = t.changed_balance || t.amount || t.value || 0 - daily[ts].revenueBTC += Math.abs(amount) / BTC_SATS + const txs = entry.transactions || [] + if (!Array.isArray(txs) || txs.length === 0) continue + + for (const tx of txs) { + if (!tx) continue + if (poolFilter && tx.username !== poolFilter) continue + + if (!daily[ts]) daily[ts] = { revenueBTC: 0, hashrate: 0, hashCount: 0 } + daily[ts].revenueBTC += Math.abs(tx.changed_balance || 0) + if (tx.mining_extra?.hash_rate) { + daily[ts].hashrate += tx.mining_extra.hash_rate + daily[ts].hashCount++ + } } } } @@ -164,7 +117,6 @@ function calculateAggregateSummary (log) { module.exports = { getPoolStatsAggregate, - processStatsData, - processRevenueData, + processTransactionData, calculateAggregateSummary } From 20a7efa54491a248793d6a9a375db4356edf8276 Mon Sep 17 00:00:00 2001 From: Caesar Mukama Date: Thu, 12 Feb 2026 22:44:31 +0300 Subject: [PATCH 4/5] code cleanup --- workers/lib/server/handlers/pools.handlers.js | 6 ------ 1 file changed, 6 deletions(-) diff --git a/workers/lib/server/handlers/pools.handlers.js b/workers/lib/server/handlers/pools.handlers.js index d608d5b..670d865 100644 --- a/workers/lib/server/handlers/pools.handlers.js +++ b/workers/lib/server/handlers/pools.handlers.js @@ -49,12 +49,6 @@ async function getPoolStatsAggregate (ctx, req) { return { log: aggregated, summary } } -/** - * Processes ext-data transaction results into daily entries. - * The ext-data response for transactions with start/end: - * [ { ts, transactions: [{username, changed_balance, mining_extra: {hash_rate, ...}}, ...] }, ... ] - * changed_balance is already in BTC (not satoshis). - */ function processTransactionData (results, poolFilter) { const daily = {} for (const res of results) { From 4b6aa67dd1d3b71a5a4387aa0ef9b557e6b27d1e Mon Sep 17 00:00:00 2001 From: Caesar Mukama Date: Sun, 15 Feb 2026 16:49:11 +0300 Subject: [PATCH 5/5] fix: move pool filter to RPC payload for pool-stats-aggregate endpoint Pool name filter should be handled by the RPC worker, not filtered client-side. The tx username is the account id. --- tests/unit/handlers/pools.handlers.test.js | 29 +++++-------------- workers/lib/server/handlers/pools.handlers.js | 7 ++--- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/tests/unit/handlers/pools.handlers.test.js b/tests/unit/handlers/pools.handlers.test.js index d5c30e4..b511938 100644 --- a/tests/unit/handlers/pools.handlers.test.js +++ b/tests/unit/handlers/pools.handlers.test.js @@ -40,10 +40,12 @@ test('getPoolStatsAggregate - happy path', async (t) => { }) test('getPoolStatsAggregate - with pool filter', async (t) => { + let capturedPayload = null const mockCtx = { conf: { orks: [{ rpcPublicKey: 'key1' }] }, net_r0: { - jRequest: async () => { + jRequest: async (key, method, payload) => { + capturedPayload = payload return [{ ts: '1700006400000', transactions: [ @@ -60,8 +62,8 @@ test('getPoolStatsAggregate - with pool filter', async (t) => { } const result = await getPoolStatsAggregate(mockCtx, mockReq, {}) - t.ok(result.log, 'should return filtered log') - t.ok(result.log.length > 0, 'should have entries') + t.ok(result.log, 'should return log') + t.is(capturedPayload.query.pool, 'user1', 'should pass pool filter in RPC payload') t.pass() }) @@ -117,7 +119,7 @@ test('processTransactionData - processes valid transactions', (t) => { ] }] ] - const daily = processTransactionData(results, null) + const daily = processTransactionData(results) t.ok(typeof daily === 'object', 'should return object') const keys = Object.keys(daily) t.ok(keys.length > 0, 'should have entries') @@ -129,28 +131,11 @@ test('processTransactionData - processes valid transactions', (t) => { test('processTransactionData - handles error results', (t) => { const results = [{ error: 'timeout' }] - const daily = processTransactionData(results, null) + const daily = processTransactionData(results) t.is(Object.keys(daily).length, 0, 'should be empty') t.pass() }) -test('processTransactionData - filters by pool', (t) => { - const results = [ - [{ - ts: '1700006400000', - transactions: [ - { username: 'user1', changed_balance: 0.001 }, - { username: 'user2', changed_balance: 0.002 } - ] - }] - ] - const daily = processTransactionData(results, 'user1') - const keys = Object.keys(daily) - t.ok(keys.length > 0, 'should have entries') - t.is(daily[keys[0]].revenueBTC, 0.001, 'should only include user1 revenue') - t.pass() -}) - test('calculateAggregateSummary - calculates from log', (t) => { const log = [ { revenueBTC: 0.5, hashrate: 100, workerCount: 5, balance: 50000 }, diff --git a/workers/lib/server/handlers/pools.handlers.js b/workers/lib/server/handlers/pools.handlers.js index 670d865..5232afe 100644 --- a/workers/lib/server/handlers/pools.handlers.js +++ b/workers/lib/server/handlers/pools.handlers.js @@ -28,10 +28,10 @@ async function getPoolStatsAggregate (ctx, req) { const transactionResults = await requestRpcEachLimit(ctx, RPC_METHODS.GET_WRK_EXT_DATA, { type: WORKER_TYPES.MINERPOOL, - query: { key: MINERPOOL_EXT_DATA_KEYS.TRANSACTIONS, start, end } + query: { key: MINERPOOL_EXT_DATA_KEYS.TRANSACTIONS, start, end, pool: poolFilter } }) - const dailyData = processTransactionData(transactionResults, poolFilter) + const dailyData = processTransactionData(transactionResults) const log = Object.entries(dailyData) .sort(([a], [b]) => Number(a) - Number(b)) @@ -49,7 +49,7 @@ async function getPoolStatsAggregate (ctx, req) { return { log: aggregated, summary } } -function processTransactionData (results, poolFilter) { +function processTransactionData (results) { const daily = {} for (const res of results) { if (res.error || !res) continue @@ -66,7 +66,6 @@ function processTransactionData (results, poolFilter) { for (const tx of txs) { if (!tx) continue - if (poolFilter && tx.username !== poolFilter) continue if (!daily[ts]) daily[ts] = { revenueBTC: 0, hashrate: 0, hashCount: 0 } daily[ts].revenueBTC += Math.abs(tx.changed_balance || 0)