This repository was archived by the owner on Dec 29, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 225
Expand file tree
/
Copy pathbasedb.py
More file actions
130 lines (106 loc) · 3.65 KB
/
basedb.py
File metadata and controls
130 lines (106 loc) · 3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Authors:
# Trevor Perrin
# Martin von Loewis - python 3 port
#
# See the LICENSE file for legal information regarding use of this file.
"""Base class for SharedKeyDB and VerifierDB."""
try:
import anydbm
except ImportError:
# Python 3
import dbm as anydbm
import threading
class BaseDB(object):
def __init__(self, filename, type):
self.type = type
self.filename = filename
if self.filename:
self.db = None
else:
self.db = {}
self.lock = threading.Lock()
def create(self):
"""Create a new on-disk database.
@raise anydbm.error: If there's a problem creating the database.
"""
if self.filename:
self.db = anydbm.open(self.filename, "n") #raises anydbm.error
self.db["--Reserved--type"] = self.type
self.db.sync()
else:
self.db = {}
def open(self):
"""Open a pre-existing on-disk database.
@raise anydbm.error: If there's a problem opening the database.
@raise ValueError: If the database is not of the right type.
"""
if not self.filename:
raise ValueError("Can only open on-disk databases")
self.db = anydbm.open(self.filename, "w") #raises anydbm.error
try:
if self.db["--Reserved--type"] != self.type:
raise ValueError("Not a %s database" % self.type)
except KeyError:
raise ValueError("Not a recognized database")
def __getitem__(self, username):
if self.db == None:
raise AssertionError("DB not open")
self.lock.acquire()
try:
valueStr = self.db[username]
finally:
self.lock.release()
return self._getItem(username, valueStr)
def __setitem__(self, username, value):
if self.db == None:
raise AssertionError("DB not open")
valueStr = self._setItem(username, value)
self.lock.acquire()
try:
self.db[username] = valueStr
if self.filename:
self.db.sync()
finally:
self.lock.release()
def __delitem__(self, username):
if self.db == None:
raise AssertionError("DB not open")
self.lock.acquire()
try:
del(self.db[username])
if self.filename:
self.db.sync()
finally:
self.lock.release()
def __contains__(self, username):
"""Check if the database contains the specified username.
@type username: str
@param username: The username to check for.
@rtype: bool
@return: True if the database contains the username, False
otherwise.
"""
if self.db == None:
raise AssertionError("DB not open")
self.lock.acquire()
try:
return self.db.has_key(username)
finally:
self.lock.release()
def check(self, username, param):
value = self.__getitem__(username)
return self._checkItem(value, username, param)
def keys(self):
"""Return a list of usernames in the database.
@rtype: list
@return: The usernames in the database.
"""
if self.db == None:
raise AssertionError("DB not open")
self.lock.acquire()
try:
usernames = self.db.keys()
finally:
self.lock.release()
usernames = [u for u in usernames if not u.startswith("--Reserved--")]
return usernames