Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions example/LockFreeQueue.chpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Created by Garvit Dewan -
* https://github.com/dgarvit/epoch-based-manager/blob/master/src/LockFreeQueue.chpl
*
* Lock-Free Queue that uses ABA feature of Distributed Data Structures
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably say that it uses the ABA feature of LocalAtomicObject. Sure the LocalAtomicObject was created back in GSoC 2017, but you don't need to state that here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

* Based on Michael Scott Queue
*/
module LockFreeQueue {

use LocalAtomics;

class node {
type eltType;
var val : eltType;
var next : LocalAtomicObject(unmanaged node(eltType));

proc init(val : ?eltType) {
this.eltType = eltType;
this.val = val;
}

proc init(type eltType) {
this.eltType = eltType;
val = nil;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not initialize val with a value, leave it as is (it will be initialized to a default value automatically). If eltType is an integral (int,uint, etc.), a tuple or a record, this will result in a compiler error.

Copy link
Copy Markdown
Collaborator Author

@dgarvit dgarvit Jun 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think that would further require changes to peek() and dequeue() in terms that if queue is empty then cannot return nil for eltType int

}
}

class LockFreeQueue {
type objType;
var _head : LocalAtomicObject(unmanaged node(objType));
var _tail : LocalAtomicObject(unmanaged node(objType));

proc init(type objType) {
this.objType = objType;
this.complete();
var _node = new unmanaged node(objType);
_head.write(_node);
_tail.write(_node);
}

proc enqueue(newObj : objType) {
var n = new unmanaged node(newObj);
while (true) {
var curr_tail = _tail.readABA();
var next = curr_tail.next.readABA();
if (next.getObject() == nil) {
if (curr_tail.next.compareExchangeABA(next, n)) {
_tail.compareExchangeABA(curr_tail, n);
break;
}
}
else {
_tail.compareExchangeABA(curr_tail, next.getObject());
}
}
}

proc dequeue() : objType {
while (true) {
var curr_head = _head.readABA();
var curr_tail = _tail.readABA();
var next = curr_head.next.readABA();
if (_head.read() == _tail.read()) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you read curr_head and curr_tail and then try to read them again? Are you sure you shouldn't do curr_head.getObject() == curr_tail.getObject()?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, fixed it. Thanks for pointing out!

if (next.getObject() == nil) then
return nil;
_tail.compareExchangeABA(curr_tail, next.getObject());
}
else {
if (_head.compareExchangeABA(curr_head, next.getObject())) then
return next.getObject().val;
}
}
return nil;
}

iter these() : objType {
var ptr = _head.read().next.read();
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the queue is empty?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When queue is empty, it will hold a dummy node, which will have a next, which will be a LocalAtomicObject, which will point to a nil value.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, nothing will happen, because, if ptr is nil. then the function will finish executing.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot about the dummy node! That should work then.

while (ptr != nil) {
yield ptr.val;
ptr = ptr.next.read();
}
}

proc peek() : objType {
return _head.read().next.read().val;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the queue is empty?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the previous reason, this will return nil. There was a bug with it, which I have fixed now.

}

proc deinit() {
var ptr = _head.read();
while (ptr != nil) {
_head = ptr.next;
delete ptr.val;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, don't assume the user will have classes. If they want to have their classes managed, they should use a shared or owned type.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Alright.

delete ptr;
ptr = _head.read();
}
}
}
}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a main function here that tests it, similar to what is done in LockFreeStack.chpl:

proc main() {
var a = new LockFreeStack(int);
forall i in 1..1024 do a.push(i);
a.push(1025..2048);
writeln(+ reduce a);
}