-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathC2DDatabase.ts
More file actions
executable file
·191 lines (170 loc) · 6.02 KB
/
C2DDatabase.ts
File metadata and controls
executable file
·191 lines (170 loc) · 6.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import path from 'path'
import fs from 'fs'
import {
ComputeEnvironment,
DBComputeJob,
C2DStatusNumber
} from '../../@types/C2D/C2D.js'
import { SQLiteCompute } from './sqliteCompute.js'
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { OceanNodeDBConfig } from '../../@types/OceanNode.js'
import { TypesenseSchema } from './TypesenseSchemas.js'
import { AbstractDatabase } from './BaseDatabase.js'
import { OceanNode } from '../../OceanNode.js'
import { getDatabase } from '../../utils/database.js'
import { getConfiguration } from '../../utils/index.js'
import { generateUniqueID } from '../core/compute/utils.js'
export class C2DDatabase extends AbstractDatabase {
private provider: SQLiteCompute
constructor(config: OceanNodeDBConfig, schema: TypesenseSchema) {
super(config, schema)
return (async (): Promise<C2DDatabase> => {
// Fall back to SQLite
DATABASE_LOGGER.info('Creating C2DDatabase with SQLite')
// Ensure the directory exists before instantiating SQLiteProvider
const dbDir = path.dirname('databases/c2dDatabase.sqlite')
if (!fs.existsSync(dbDir)) {
fs.mkdirSync(dbDir, { recursive: true })
}
this.provider = new SQLiteCompute('databases/c2dDatabase.sqlite')
await this.provider.createTable()
await this.provider.createImageTable()
return this
})() as unknown as C2DDatabase
}
async newJob(job: DBComputeJob): Promise<string> {
if (!job.jobId) job.jobId = generateUniqueID(job)
const jobId = await this.provider.newJob(job)
return jobId
}
async getJob(
jobId?: string,
agreementId?: string,
owner?: string
): Promise<DBComputeJob[]> {
const jobs = await this.provider.getJob(jobId, agreementId, owner)
return jobs
}
async updateJob(job: DBComputeJob): Promise<number> {
let updated = 0
const previouslySaved: DBComputeJob[] = await this.getJob(job.jobId)
if (previouslySaved.length === 1) {
previouslySaved[0] = job
updated = await this.provider.updateJob(previouslySaved[0])
if (!updated) {
DATABASE_LOGGER.error(`Unable to update job: ${job.jobId}. No rows affected!`)
}
} else {
DATABASE_LOGGER.error(
`Unable to update job: ${job.jobId}. It seems this jobID does not exist!`
)
}
return updated
}
async getRunningJobs(engine?: string, environment?: string): Promise<DBComputeJob[]> {
return await this.provider.getRunningJobs(engine, environment)
}
async deleteJob(jobId: string): Promise<boolean> {
return await this.provider.deleteJob(jobId)
}
async getFinishedJobs(environments?: string[]): Promise<DBComputeJob[]> {
return await this.provider.getFinishedJobs(environments)
}
async getJobs(
environments?: string[],
fromTimestamp?: string,
consumerAddrs?: string[],
status?: C2DStatusNumber,
runningJobs?: boolean
): Promise<DBComputeJob[]> {
return await this.provider.getJobs(
environments,
fromTimestamp,
consumerAddrs,
status,
runningJobs
)
}
async getJobsByStatus(
environments: string[],
status: C2DStatusNumber[]
): Promise<DBComputeJob[]> {
return await this.provider.getJobsByStatus(environments, status)
}
async updateImage(image: string): Promise<void> {
return await this.provider.updateImage(image)
}
async deleteImage(image: string): Promise<void> {
return await this.provider.deleteImage(image)
}
async getOldImages(retentionDays: number): Promise<string[]> {
return await this.provider.getOldImages(retentionDays)
}
/**
*
* @param environment compute environment to check for
*
* All compute engines have compute environments,
* and each compute environment specifies how long the output produced by
* a job is held by the node, before being deleted.
* When a job expiry is overdue, the node will delete all storage used by that job,
* and also delete the job record from the database
* @returns array of eexpired jobs
*/
async cleanStorageExpiredJobs(): Promise<number> {
const allEnvironments: ComputeEnvironment[] = []
const currentTimestamp = Date.now() / 1000
const config = await getConfiguration(true)
const allEngines = await OceanNode.getInstance(
config,
await getDatabase()
).getC2DEngines().engines
let cleaned = 0
for (const engine of allEngines) {
const engineEnvironments = await engine.getComputeEnvironments()
for (const computeEnvironment of engineEnvironments) {
allEnvironments.push(computeEnvironment)
const finishedOrExpired: DBComputeJob[] = await this.provider.getFinishedJobs([
computeEnvironment.id
])
for (const job of finishedOrExpired) {
if (
computeEnvironment &&
computeEnvironment.storageExpiry <
currentTimestamp - parseInt(job.dateFinished)
) {
if (await engine.cleanupExpiredStorage(job)) {
cleaned++
}
}
}
}
}
// now let's clean jobs that have an unknown envs (not in our envs)
cleaned += await this.cleanOrphanJobs(allEnvironments)
return cleaned
}
/**
* Clean orphan jobs. Stuff left on DB without existing environments associated
* @param existingEnvironments
* @returns number of orphans
*/
async cleanOrphanJobs(existingEnvironments: ComputeEnvironment[]) {
const c2dDatabase = await (await getDatabase()).c2d
let cleaned = 0
const envIds: string[] = existingEnvironments
.filter((env: any) => env && typeof env.id === 'string')
.map((env: any) => env.id)
// Get all finished jobs from DB, not just from known environments
const allJobs: DBComputeJob[] = await c2dDatabase.getFinishedJobs()
for (const job of allJobs) {
if (!job.environment || !envIds.includes(job.environment)) {
if (await c2dDatabase.deleteJob(job.jobId)) {
cleaned++
}
}
}
DATABASE_LOGGER.info('Cleaned ' + cleaned + ' orphan C2D jobs')
return cleaned
}
}