The Orca Grading Queue is ephemeral in nature: all jobs are created with the promise of eventual removal. Redis was the chosen tool to implement the Grading Queue, as an in-memory storage system with quick access to its elements.
A full implementation of the grading queue requires three pieces of functionality:
- Knowing when a job is ready to be extracted and graded.
- Retrieving the next job to be graded for a given team or user.
- Obtaining the correct
GradingJobobject.
The data definiton for (1) must support a priority-based ordering of jobs and contain information to be used in (2).
Redis' ZSet data structure is an ordered set with keys sorted by a given score value (in practice, this is more usefully typed as an OrderedMap<string, number>). This can be utilized to create a priority queue of IDs to be popped off when a grading VM is ready to extract a new job.
ZSet Reservations => {
<"team" | "user">.<id>.nonce: release_time
<"immediate">.<GradingJobKey>.nonce: release_time
}
The set used under the key Reservations, and each ZSet member is a unique identifier needed for retrieving the associated grading job -- either directly or through an ordered list of jobs for team/user submissions. Members receive a score equivalent to the timestamp of when they should be released for grading.
Each ZSet key also has a nonce, which is used to ensure students spamming the queue with multiple jobs reserve a corresponding spot.
GradingJobs are enqueued to be processed based on a given priority OR to be processed immediately. In the latter case, the ZSet member string is composed of the grading job's key, which is a string unique to that job generated by the Orca web server.
Orca stores the keys for a user/team's jobs in a Stack. Orca implements "Last In First Out" behavior so that the most recent job, which is expected to yield the most useful feedback on a submission, will be graded first.
This can be implemented with a Redis List, where the list is mapped to a key containing the given user/team ID.
List SubmitterInfo.<"team" | "user">.<team_id | user_id> => [ job_key_0, job_key_1, ..., job_key_n ]
The stack order is maintained by only ever prepending job keys and popping the first one off the list.
GradingJob objects are individually mapped to a unique key utilizing the STRING data structure in Redis.
String (Key) => GradingJob
Only one GradingJob exists per GradingJobKey so that the most recent grading script specifications provided by a professor are used for grading. For instance, if a student submits and a professor updates an assignment's test files and clicks "regrade" for the student, this will update the grading job under that GradingJobKey such that when the job gets graded, it will run with the updated specs.
For optimization purposes, users may need to know what nonces are in use by a collation type/id in the Reservations ZSet.
These nonces are stored under the key Nonces.<collation_type>.<collation_id>.
Given the definitions above, the following invariants define a valid state of the queue:
- All grading jobs correspond to a single key, and vice versa.
- Job keys are unique within and across
SubmitterInfolists. - Each reservation belongs to a single, unique job key.
- Each reservation has a single, unique nonce.
- The number of nonces in the
Noncesset is equal to the number of reservations for that user or team.
All operations are performed by either the web server or a worker.
A couple of rules must be followed in order to ensure that the state of the queue is never corrupted:
- Thou shall not run any queue operations without a lock. While not ACID compliant, Redis offers up the Redlock algorithm as a method to implement locking resources to ensure that two clients do not interfere with each other. Failure to use this could result in undefined behavior -- such as two workers trying to pop off the same job.
- Thou shall not update, delete or create anything in the queue without a transaction. Redis does not implement transactions, but it is possible to implement them through a combination of (a) utilizing the Redis
MULTIcommand -- allowing an atomic execution of some series of operations and (b) building some symblance of rollback steps -- also achieved by grouping them onto anMULTIcommand. Failure to rollback faulty operations could result in breaking of the previously defined invariants.
As mentioned in the Data Definitions document, in order to create a GradingJob Orca must create an identifier unique to each given job prior to enqueuing.
import { createHash } from 'crypto';
funciton generateOrcaKey({ key, response_url }: GradingJobConfig): string {
const hash = createHash("someHashAlgorithm");
hash.update(key + response_url);
return hash.digest('base64');
}This is accomplished by hashing the concatenation of a GradingJobConfig's key property with its response_url property.
The hash is then encoded using base64 to ensure that no '.' character becomes part of the key. This is necessary to preclude undefined behavior when splitting keys using '.' as a delimiter.
Adding a job to the queue is a create-or-update function (or a PUT in HTTP terms). If a job is already seen to be in the queue, then its contents will be updated without adding new data (e.g., reservation, nonce, etc) to the queue.
In the create case, Orca will queue up grading jobs based on their user ID or team ID. With this value, the generated orca key will be pushed onto the SubmitterInfo stack to ensure it will be the first job to get graded.
function createOrUpdateGradingJob(jobConfig: GradingJobConfig) {
const orcaKey = generateOrcaKey(jobConfig);
const arrivalTime = time.now();
if (!EXISTS(orcaKey)) enqueueJob(jobConfig, orcaKey, arrivalTime);
SET(orcaKey, jobConfig);
}
function enqueueJob(
{ key, collation, priority }: GradingJobConfig,
orcaKey: string,
arrivalTime: number
) {
const nextTask = `${collation.type}.${collation.id}`;
const releaseTime = priority + arrivalTime;
const nonce = generateNonce(releaseTime, collation);
RPUSH(`SubmitterInfo.${nextTask}`, orcaKey);
ZADD("Reservations", `${nextTask}.${nonce}`, releaseTime);
SADD(`Nonces.${collation.type}.${collation.id}`, nonce);
}
function generateNonce(releaseTime: number, collation: Collation): number {
let nonce = releaseTime;
while (SISMEMBER(nonce.toString())) {
nonce++;
}
return nonce;
}GradingJobConfigs sent to Orca will contain a priority, which is a delay to be placed on a job. For now, assume priority = (# of subs in last 15 mins) * 1 min.
The release time of a job is defined by the addition of the job's arrival time and its priority; this value is used as the score of the reservation in the Reservations ZSET.
When generating the member string for this reservation, the release time is used to create a unique nonce -- starting with this value and then incrementing it until the value is found not to exist in the ZSET.
The nonce of the reservation is then cached for other update functionality.
A professor may want to submit a job for immediate grading in the event they change the original test criteria.
While it's possible to place a job at the front of the line for a student/team by giving it a priority of 0, jobs may be added afterwards and thus a regrade attempt may be waiting for an arbitrarily long time. For this reason, if a non-immediate job already exists, it is replaced with a new immediate job.
Jobs added to the queue for immediate grading are added to the Reservations ZSet using its server-generated key instead of the team or user ID. This allows the job to bypass the SubmitterInfo list, ensuring that it cannot be cut in line.
function createOrUpdateImmediateJob(jobConfig: GradingJobConfig) {
const orcaKey = generateOrcaKey(jobConfig);
if (!jobInQueue(jobConfig)) createImmediateJob(jobConfig, orcaKey);
if (nonImmediateJobExists(jobConfig)) upgradeJob(jobConfig, orcaKey);
SET(job.key, job);
}
function nonImmediateJobExists({ key, collation }: GradingJob) {
for (jobKey in LRANGE(`SubmitterInfo.${collation.type}.${collation.id}`)) {
if (jobKey === key) return true;
}
return false;
}
function upgradeJob(job: GradingJob) {
removeNonImmediateJob(job);
createImmediateJob(job);
}
function removeNonImmediateJob(
{ collation }: GradingJobConfig,
orcaKey: string
) {
LREM(`SubmitterInfo.${collation.type}.${collation.id}`, 0, orcaKey);
const nonce = SPOP(`Nonces.${collation.type}.${collation.id}`);
ZREM("Reservations", `${collation.type}.${collation.id}.${nonce}`);
}
function createImmediateJob(job: GradingJob, orcaKey: string) {
arrival_time = time.now();
ZSET(Reservations, `immediate.${orcaKey}`, arrival_time);
}Its reservation score/release time is set directly to the arrival time to ensure there is no delay.
Jobs in the queue may either be:
- Moved to the back of the queue.
- Moved to the front of the queue.
enum MoveJobAction {
RELEASE = "release",
DELAY = "delay",
}
interface MoveJobRequest {
nonce: number;
jobKey: string;
moveAction: MoveJobAction;
collation: Collation;
}Moving a job requires its unique key, the nonce used in the job's corresponding Reservations ZSet key, the type of action to be taken, and collation data. Jobs submitted for immediate grading cannot be moved.
function moveJob(req: JobMoveRequest) {
const { nonce, jobKey, moveAction, collation } = req;
switch (move_action) {
case MoveJobAction.RELEASE:
const job = GET(jobKey);
upgradeJob(job);
break;
case MoveJobAction.DELAY:
delayJob(req);
break;
default:
throw Error();
}
}
const MOVE_TO_BACK_BUFFER = 10; // seconds
function delayJob(req: JobMoveReuquest) {
[last_job, last_priority] = ZRANGE("Reservations", -1, -1, WITHSCORES); // Gets job at back of queue
const new_priority = last_priority + MOVE_TO_BACK_BUFFER;
ZADD(
"Reservations",
`${req.collation.type}.${req.collation.id}.${req.nonce}`,
new_priority
);
const subInfoKey = `SubmitterInfo.${req.collation.type}.${req.collation.id}`;
LREM(subInfoKey, req.jobKey);
RPUSH(subInfoKey, req.jobKey);
return new_priority;
}A job is moved to the back of the queue by assigning it a new priority of priorityOf(lastJobInQueue) + buffer as its score in Reservations. The buffer is used to guarantee that the given job will be placed at the back. The job is also placed at the back of SubmitterInfo.
Jobs are moved to the front of the queue by being replaced with an immediate job, ensuring they cannot be skipped.
- Release a job -> job at front of queue
- Delay a job -> job at back of queue
- Release job_1, release job_2 -> job_2 in front of job_1, job_1 in front of job_3...job_n
- Delay job_1, delay job_2 -> job_2 is behind job_1, job_1 is behind job_3...job_n
A job can be deleted given its key, as well as its collation and nonce for nonimmediate jobs, in the Reservations ZSet.
function deleteJob(
jobKey: string,
nonce: number,
collation: Collation? = undefined
) {
if (collation) {
LREM(`SubmitterInfo.${collation.type}.${collation.id}`, 1, job_key);
ZREM("Reservations", [collation.type, collation.id, nonce].join("."));
SREM(`Nonces.${collation.type}.${collation.id}`, nonce);
} else {
ZREM("Reservations", `immediate.${nonce}`);
}
}All jobs have their data ejected from Reservations. Non-immediate jobs also have data removed from both the Nonces and SubmitterInfo set and list, respectively.
Grading jobs are popped from the queue using their associated GradingJobKey.
function getNextJob() {
const nextTask = ZPOPMIN("Reservations");
const nextTaskInfo = next_task.split(".");
let jobKey, collationType, collationID, nonce;
if (nextInfo[0] === "immediate") {
[_, jobKey] = nextTaskInfo;
} else {
[collationType, collationID, nonce] = nextTaskInfo;
SREM(`Nonces.${collationType}.${collationID}`, nonce);
jobKey = LPOP(`SubmitterInfo.${collationType}.${collationID}`);
}
reutrn JSON.parse(GETDEL(jobKey));
}This is either obtained directly by popping off the first item from Reservations, or by using the collation data from the reservation to pop the first job key off of SubmitterInfo.
All data pointing to the job is deleted, either through a "pop" or deletion function.
If a worker is to be shut down prior to finishing a job, that job should be re-enqueued to ensure it is not completely lost.
function reenqueueJob(job: GradingJob) {
ZSET("Reservations", `immediate.${job.orca_key}`, job.created_at);
SET(job.orca_key, JSON.stringify(job));
}To ensure the job will be picked up next by whatever worker is ready, it is enqueued with an immediate reservation with a timestamp equal to its original arrival time.