@@ -34,9 +34,7 @@ class MongoDB:
3434 https://api.mongodb.com/python/current/api/pymongo/mongo_client.html
3535 """
3636
37- def __init__ (
38- self , uri , dbname , mongoclient = pymongo .MongoClient , options = None ,
39- ):
37+ def __init__ (self , uri , dbname , mongoclient = pymongo .MongoClient , options = None ):
4038 self ._dbname = dbname
4139 self ._uri = uri
4240 self ._MongoClient = mongoclient
@@ -92,30 +90,104 @@ def create_indexes(self):
9290 [("timestamp" , pymongo .ASCENDING )], unique = True , background = True
9391 )
9492
93+ def start_session (self ):
94+ """Inicia uma sessão transacional.
95+ """
96+ return self ._client .start_session ()
97+
98+ def start_transaction (self ):
99+ """Inicia uma transação.
100+ """
101+ return self ._client .start_transaction ()
102+
103+ def create_collections (self ):
104+ """Cria as coleções na base de dados.
105+
106+ Com o uso de transações em instâncias de MongoDB < 4.4 as coleções não
107+ podem ser criadas implicitamente.
108+ """
109+ for colname in ["documents" , "documents_bundles" , "journals" , "changes" ]:
110+ self ._db ().create_collection (colname )
111+
95112
96113class Session (interfaces .Session ):
97114 """Implementação de `interfaces.Session` para armazenamento em MongoDB.
98- Trata-se de uma classe concreta e não deve ser generalizada.
115+
116+ Instâncias de :class:`Session` servem como pontos de acesso aos repositórios
117+ de dados. Instâncias desta classe poderão usar da sintaxe de gerenciadores
118+ de contexto do Python, mas sem qualquer efeito. Caso
99119 """
100120
101121 def __init__ (self , mongodb_client ):
102122 self ._mongodb_client = mongodb_client
103123
124+ def __enter__ (self ):
125+ return self
126+
127+ def __exit__ (self , exc_type , exc_value , traceback ):
128+ pass
129+
130+ def _repo_extra_args (self ):
131+ """Argumentos extras que serão passados na inicialização dos
132+ repositórios.
133+ """
134+ return {}
135+
104136 @property
105137 def documents (self ):
106- return DocumentStore (self ._mongodb_client .documents )
138+ return DocumentStore (self ._mongodb_client .documents , ** self . _repo_extra_args () )
107139
108140 @property
109141 def documents_bundles (self ):
110- return DocumentsBundleStore (self ._mongodb_client .documents_bundles )
142+ return DocumentsBundleStore (
143+ self ._mongodb_client .documents_bundles , ** self ._repo_extra_args ()
144+ )
111145
112146 @property
113147 def journals (self ):
114- return JournalStore (self ._mongodb_client .journals )
148+ return JournalStore (self ._mongodb_client .journals , ** self . _repo_extra_args () )
115149
116150 @property
117151 def changes (self ):
118- return ChangesStore (self ._mongodb_client .changes )
152+ return ChangesStore (self ._mongodb_client .changes , ** self ._repo_extra_args ())
153+
154+
155+ class TransactionalSession (Session ):
156+ """Implementação de `interfaces.Session` para armazenamento em MongoDB, com
157+ suporte transacional de múltiplas coleções.
158+
159+ Instâncias de :class:`TransactionalSession` servem como pontos de acesso aos
160+ repositórios de dados. Elas podem ser utilizadas na realização de consultas
161+ avulsas aos dados ou mais sofisticadas, em contextos transacionais. Caso o
162+ último seja desejado, deve-se instanciar :class:`TransactionalSession` com a
163+ sintaxe de gerenciadores de contexto do Python.
164+ """
165+
166+ def __init__ (self , mongodb_client ):
167+ self ._mongodb_client = mongodb_client
168+ self ._txn_session = None
169+
170+ def __enter__ (self ):
171+ self ._txn_session = self ._mongodb_client .start_session ()
172+ self ._txn_session .start_transaction ()
173+ LOGGER .debug ("new MongoDB transactional session created: %s" , self ._txn_session )
174+ return self
175+
176+ def __exit__ (self , exc_type , exc_value , traceback ):
177+ if exc_type :
178+ self ._txn_session .abort_transaction ()
179+ LOGGER .debug (
180+ 'transaction "%s" was aborted: %s' , self ._txn_session , exc_value
181+ )
182+ else :
183+ self ._txn_session .commit_transaction ()
184+ LOGGER .debug ('transaction "%s" was commited' , self ._txn_session )
185+
186+ def _repo_extra_args (self ):
187+ """Argumentos extras que serão passados na inicialização dos
188+ repositórios.
189+ """
190+ return {"txn_session" : self ._txn_session }
119191
120192
121193class BaseStore (interfaces .DataStore ):
@@ -124,8 +196,15 @@ class BaseStore(interfaces.DataStore):
124196 implementam/definem o atributo `DomainClass`.
125197 """
126198
127- def __init__ (self , collection ):
199+ def __init__ (self , collection , txn_session = None ):
128200 self ._collection = collection
201+ self ._txn_session = txn_session
202+
203+ def _txn_session_arg (self ):
204+ if self ._txn_session :
205+ return {"session" : self ._txn_session }
206+ else :
207+ return {}
129208
130209 def _pre_write (self , data ) -> dict :
131210 """Tratamento anterior ao armazenamento do dado no MongoDB."""
@@ -141,22 +220,24 @@ def _post_read(self, data: dict) -> dict:
141220 def add (self , data ) -> None :
142221 try :
143222 _ , _manifest = self ._pre_write (data )
144- self ._collection .insert_one (_manifest )
223+ self ._collection .insert_one (_manifest , ** self . _txn_session_arg () )
145224 except pymongo .errors .DuplicateKeyError :
146225 raise exceptions .AlreadyExists (
147226 "cannot add data with id " '"%s": the id is already in use' % data .id ()
148227 ) from None
149228
150229 def update (self , data ) -> None :
151230 _id , _manifest = self ._pre_write (data )
152- result = self ._collection .replace_one ({"_id" : _id }, _manifest )
231+ result = self ._collection .replace_one (
232+ {"_id" : _id }, _manifest , ** self ._txn_session_arg ()
233+ )
153234 if result .matched_count == 0 :
154235 raise exceptions .DoesNotExist (
155236 "cannot update data with id " '"%s": data does not exist' % data .id ()
156237 )
157238
158239 def fetch (self , id : str ):
159- manifest = self ._collection .find_one ({"_id" : id })
240+ manifest = self ._collection .find_one ({"_id" : id }, ** self . _txn_session_arg () )
160241 if manifest :
161242 return self .DomainClass (manifest = self ._post_read (manifest ))
162243 else :
@@ -170,12 +251,19 @@ class ChangesStore(interfaces.ChangesDataStore):
170251 MongoDB.
171252 """
172253
173- def __init__ (self , collection ):
254+ def __init__ (self , collection , txn_session = None ):
174255 self ._collection = collection
256+ self ._txn_session = txn_session
257+
258+ def _txn_session_arg (self ):
259+ if self ._txn_session :
260+ return {"session" : self ._txn_session }
261+ else :
262+ return {}
175263
176264 def add (self , change : dict ):
177265 try :
178- self ._collection .insert_one (change )
266+ self ._collection .insert_one (change , ** self . _txn_session_arg () )
179267 except pymongo .errors .DuplicateKeyError as exc :
180268 raise exceptions .AlreadyExists (
181269 'cannot add data with id "%s": %s' % (change ["_id" ], exc )
@@ -186,11 +274,14 @@ def filter(self, since: str = "", limit: int = 500):
186274 {"timestamp" : {"$gt" : since }},
187275 sort = [("timestamp" , pymongo .ASCENDING )],
188276 projection = {"content_gz" : False , "content_type" : False },
277+ ** self ._txn_session_arg (),
189278 ).limit (limit )
190279
191280 def fetch (self , id : str ) -> dict :
192281 try :
193- change = self ._collection .find_one ({"_id" : ObjectId (id )})
282+ change = self ._collection .find_one (
283+ {"_id" : ObjectId (id )}, ** self ._txn_session_arg ()
284+ )
194285 except bson .errors .InvalidId as exc :
195286 raise exceptions .DoesNotExist (
196287 'cannot fetch data with id "%s": %s' % (id , exc )
0 commit comments