diff --git a/src/api.py b/src/api.py index 398e82ceb..7826e6295 100644 --- a/src/api.py +++ b/src/api.py @@ -90,11 +90,6 @@ sqlStoredProcedure) from highlevelcrypto import calculateInventoryHash -try: - from network import connectionpool -except ImportError: - connectionpool = None - from network import StoppableThread, invQueue, stats from version import softwareVersion @@ -1551,18 +1546,18 @@ def HandleListConnections(self): Returns bitmessage connection information as dict with keys *inbound*, *outbound*. """ - if connectionpool is None or connectionpool.pool is None: + if stats.pool is None: raise APIError(21, 'Network is not started.') inboundConnections = [] outboundConnections = [] - for i in connectionpool.pool.inboundConnections.values(): + for i in stats.pool.inboundConnections.values(): inboundConnections.append({ 'host': i.destination.host, 'port': i.destination.port, 'fullyEstablished': i.fullyEstablished, 'userAgent': str(i.userAgent) }) - for i in connectionpool.pool.outboundConnections.values(): + for i in stats.pool.outboundConnections.values(): outboundConnections.append({ 'host': i.destination.host, 'port': i.destination.port, diff --git a/src/bitmessageqt/settings.py b/src/bitmessageqt/settings.py index eeb507c75..2d3af319f 100644 --- a/src/bitmessageqt/settings.py +++ b/src/bitmessageqt/settings.py @@ -414,7 +414,7 @@ def accept(self): 'bitmessagesettings', 'udp'): self.config.set('bitmessagesettings', 'udp', str(udp_enabled)) if udp_enabled: - announceThread = AnnounceThread() + announceThread = AnnounceThread(connectionpool.pool) announceThread.daemon = True announceThread.start() else: diff --git a/src/network/__init__.py b/src/network/__init__.py index f7ba31f32..902549a79 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -17,7 +17,7 @@ __all__ = ["StoppableThread"] -def start(config, state): +def start(config, state): # pylint: disable=too-many-locals """Start network threads""" from .announcethread import AnnounceThread from . import connectionpool @@ -29,30 +29,35 @@ def start(config, state): from .knownnodes import readKnownNodes from .receivequeuethread import ReceiveQueueThread from .uploadthread import UploadThread + from . import stats # create the connection pool - connectionpool.pool = BMConnectionPool() + pool = BMConnectionPool() + connectionpool.pool = pool # check and set dandelion enabled value at network startup dandelion_ins.init_dandelion_enabled(config) # pass pool instance into dandelion class instance - dandelion_ins.init_pool(connectionpool.pool) + dandelion_ins.init_pool(pool) + + # init stats with pool reference + stats.init(pool) readKnownNodes() - connectionpool.pool.connectToStream(1) + pool.connectToStream(1) for thread in ( - BMNetworkThread(), InvThread(), AddrThread(), - DownloadThread(), UploadThread() + BMNetworkThread(pool), InvThread(pool), AddrThread(pool), + DownloadThread(pool), UploadThread(pool) ): thread.daemon = True thread.start() # Optional components for i in range(config.getint('threads', 'receive')): - thread = ReceiveQueueThread(i) + thread = ReceiveQueueThread(i, pool) thread.daemon = True thread.start() if config.safeGetBoolean('bitmessagesettings', 'udp'): - state.announceThread = AnnounceThread() + state.announceThread = AnnounceThread(pool) state.announceThread.daemon = True state.announceThread.start() diff --git a/src/network/addrthread.py b/src/network/addrthread.py index e85bb2a12..9c8d96e3d 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -4,7 +4,6 @@ import random from six.moves import queue -from . import connectionpool from protocol import assembleAddrMessage from network import addrQueue # FIXME: init with queue @@ -15,6 +14,10 @@ class AddrThread(StoppableThread): """(Node) address broadcasting thread""" name = "AddrBroadcaster" + def __init__(self, pool): + super(AddrThread, self).__init__() + self.pool = pool + def run(self): while not self._stopped: chunk = [] @@ -27,7 +30,7 @@ def run(self): if chunk: # Choose peers randomly - connections = connectionpool.pool.establishedConnections() + connections = self.pool.establishedConnections() random.shuffle(connections) for i in connections: random.shuffle(chunk) diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 3d27bcb3f..bda3dd937 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -3,7 +3,6 @@ """ import time -from . import connectionpool from bmconfigparser import config from protocol import assembleAddrMessage @@ -16,6 +15,10 @@ class AnnounceThread(StoppableThread): name = "Announcer" announceInterval = 60 + def __init__(self, pool): + super(AnnounceThread, self).__init__() + self.pool = pool + def run(self): lastSelfAnnounced = 0 while not self._stopped: @@ -26,13 +29,12 @@ def run(self): if processed == 0: self.stop.wait(10) - @staticmethod - def announceSelf(): + def announceSelf(self): """Announce our presence""" - for connection in connectionpool.pool.udpSockets.values(): + for connection in self.pool.udpSockets.values(): if not connection.announcing: continue - for stream in connectionpool.pool.streams: + for stream in self.pool.streams: addr = ( stream, Peer( diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 3948ba792..b7d1049b7 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -509,7 +509,7 @@ def __init__(self, sock=None, map=None): # Set to nonblocking just to make sure for cases where we # get a socket from a blocking source. sock.setblocking(0) - self.set_socket(sock, map) + self.set_socket(sock) self.connected = True # The constructor no longer requires that the socket # passed be connected. @@ -524,7 +524,7 @@ def __init__(self, sock=None, map=None): # The socket is broken in some unknown way, alert # the user and remove it from the map (to prevent # polling of broken sockets). - self.del_channel(map) + self.del_channel() raise else: self.socket = None @@ -544,22 +544,18 @@ def __repr__(self): __str__ = __repr__ - def add_channel(self, map=None): - """Add a channel""" + def add_channel(self): + """Add a channel to the asyncore map set during __init__.""" # pylint: disable=attribute-defined-outside-init - if map is None: - map = self._map - map[self._fileno] = self + self._map[self._fileno] = self self.poller_flags = 0 self.poller_filter = 0 - def del_channel(self, map=None): - """Delete a channel""" + def del_channel(self): + """Delete a channel from the asyncore map set during __init__.""" fd = self._fileno - if map is None: - map = self._map - if fd in map: - del map[fd] + if fd in self._map: + del self._map[fd] if self._fileno: try: kqueue_poller.pollster.control([select.kevent( @@ -595,11 +591,15 @@ def create_socket( sock.setblocking(0) self.set_socket(sock) - def set_socket(self, sock, map=None): - """Set socket""" + def set_socket(self, sock): + """Set socket without registering in the asyncore map. + + Registration is deferred to an explicit add_channel() call, + typically done by BMConnectionPool.addConnection() after the + connection is fully initialised with a pool reference. + """ self.socket = sock self._fileno = sock.fileno() - self.add_channel(map) def set_reuse_addr(self): """try to re-use a server port if possible""" diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 61b930461..db7a98e48 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -6,7 +6,6 @@ import protocol import state -from . import connectionpool from network import dandelion_ins from highlevelcrypto import calculateInventoryHash @@ -93,14 +92,14 @@ def checkEOLSanity(self): # .. todo:: remove from download queue raise BMObjectExpiredError() - def checkStream(self): + def checkStream(self, streams): """Check if object's stream matches streams we are interested in""" if self.streamNumber < protocol.MIN_VALID_STREAM \ or self.streamNumber > protocol.MAX_VALID_STREAM: logger.warning( 'The object has invalid stream: %s', self.streamNumber) raise BMObjectInvalidError() - if self.streamNumber not in connectionpool.pool.streams: + if self.streamNumber not in streams: logger.debug( 'The streamNumber %i isn\'t one we are interested in.', self.streamNumber) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index e49ccc20b..5f2778cc8 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -14,7 +14,6 @@ import addresses import protocol import state -from . import connectionpool from . import knownnodes from bmconfigparser import config from queues import objectProcessorQueue @@ -395,19 +394,19 @@ def bm_command_object(self): self.object.checkAlreadyHave() except (BMObjectExpiredError, BMObjectAlreadyHaveError, BMObjectInsufficientPOWError): - BMProto.stopDownloadingObject(self.object.inventoryHash) + self.stopDownloadingObject(self.object.inventoryHash) raise try: - self.object.checkStream() + self.object.checkStream(self.pool.streams) except BMObjectUnwantedStreamError: acceptmismatch = config.getboolean( "inventory", "acceptmismatch") - BMProto.stopDownloadingObject( + self.stopDownloadingObject( self.object.inventoryHash, acceptmismatch) if not acceptmismatch: raise except BMObjectInvalidError: - BMProto.stopDownloadingObject(self.object.inventoryHash) + self.stopDownloadingObject(self.object.inventoryHash) raise try: @@ -415,7 +414,7 @@ def bm_command_object(self): objectProcessorQueue.put(( self.object.objectType, buffer(self.object.data))) # noqa: F821 except BMObjectInvalidError: - BMProto.stopDownloadingObject(self.object.inventoryHash, True) + self.stopDownloadingObject(self.object.inventoryHash, True) else: try: del missingObjects[self.object.inventoryHash] @@ -448,7 +447,7 @@ def bm_command_addr(self): for seenTime, stream, _, ip, port in self._decode_addr(): ip = str(ip) if ( - stream not in connectionpool.pool.streams + stream not in self.pool.streams # FIXME: should check against complete list or ip.startswith('bootstrap') ): @@ -547,7 +546,7 @@ def bm_command_version(self): if not self.isOutbound: self.append_write_buf(protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, dandelion_ins.enabled, True, + self.pool.streams, dandelion_ins.enabled, True, nodeid=self.nodeid)) logger.debug( '%(host)s:%(port)i sending version', @@ -603,7 +602,7 @@ def peerValidityChecks(self): 'Closed connection to %s because there is no overlapping' ' interest in streams.', self.destination) return False - if connectionpool.pool.inboundConnections.get( + if self.pool.inboundConnections.get( self.destination): try: if not protocol.checkSocksIP(self.destination.host): @@ -621,8 +620,8 @@ def peerValidityChecks(self): # or server full report the same error to counter deanonymisation if ( Peer(self.destination.host, self.peerNode.port) - in connectionpool.pool.inboundConnections - or len(connectionpool.pool) + in self.pool.inboundConnections + or len(self.pool) > config.safeGetInt( 'bitmessagesettings', 'maxtotalconnections') + config.safeGetInt( @@ -634,7 +633,7 @@ def peerValidityChecks(self): 'Closed connection to %s due to server full' ' or duplicate inbound/outbound.', self.destination) return False - if connectionpool.pool.isAlreadyConnected(self.nonce): + if self.pool.isAlreadyConnected(self.nonce): self.append_write_buf(protocol.assembleErrorMessage( errorText="I'm connected to myself. Closing connection.", fatal=2)) @@ -645,10 +644,9 @@ def peerValidityChecks(self): return True - @staticmethod - def stopDownloadingObject(hashId, forwardAnyway=False): + def stopDownloadingObject(self, hashId, forwardAnyway=False): """Stop downloading object *hashId*""" - for connection in connectionpool.pool.connections(): + for connection in self.pool.connections(): try: del connection.objectsNewToMe[hashId] except KeyError: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 5bcd94ad7..d611e8b2b 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -120,6 +120,8 @@ def isAlreadyConnected(self, nodeid): def addConnection(self, connection): """Add a connection object to our internal dict""" from .udp import UDPSocket + connection.pool = self + connection.add_channel() if isinstance(connection, UDPSocket): return if connection.isOutbound: @@ -182,6 +184,8 @@ def startListening(self, bind=None): port = config.safeGetInt("bitmessagesettings", "port") # correct port even if it changed ls = TCPServer(host=bind, port=port) + ls.pool = self + ls.add_channel() self.listeningSockets[ls.destination] = ls def startUDPSocket(self, bind=None): @@ -198,6 +202,8 @@ def startUDPSocket(self, bind=None): udpSocket = UDPSocket(announcing=False) else: udpSocket = UDPSocket(host=bind, announcing=True) + udpSocket.pool = self + udpSocket.add_channel() self.udpSockets[udpSocket.listening.host] = udpSocket def startBootstrappers(self): diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index f8ca4e53c..c94d323af 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -6,7 +6,6 @@ import state import addresses import protocol -from . import connectionpool from network import dandelion_ins from .objectracker import missingObjects from .threads import StoppableThread @@ -20,8 +19,9 @@ class DownloadThread(StoppableThread): cleanInterval = 60 requestExpires = 3600 - def __init__(self): + def __init__(self, pool): super(DownloadThread, self).__init__(name="Downloader") + self.pool = pool self.lastCleaned = time.time() def cleanPending(self): @@ -42,7 +42,7 @@ def run(self): while not self._stopped: requested = 0 # Choose downloading peers randomly - connections = connectionpool.pool.establishedConnections() + connections = self.pool.establishedConnections() random.shuffle(connections) requestChunk = max(int( min(self.maxRequestChunk, len(missingObjects)) diff --git a/src/network/invthread.py b/src/network/invthread.py index 14508422d..da8d4464d 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -9,17 +9,16 @@ import addresses import protocol import state -from . import connectionpool from network import dandelion_ins, invQueue from .threads import StoppableThread -def handleExpiredDandelion(expired): +def handleExpiredDandelion(expired, pool): """For expired dandelion objects, mark all remotes as not having the object""" if not expired: return - for i in connectionpool.pool.connections(): + for i in pool.connections(): if not i.fullyEstablished: continue for x in expired: @@ -37,11 +36,14 @@ class InvThread(StoppableThread): name = "InvBroadcaster" - @staticmethod - def handleLocallyGenerated(stream, hashId): + def __init__(self, pool): + super(InvThread, self).__init__() + self.pool = pool + + def handleLocallyGenerated(self, stream, hashId): """Locally generated inventory items require special handling""" dandelion_ins.addHash(hashId, stream=stream) - for connection in connectionpool.pool.connections(): + for connection in self.pool.connections(): if dandelion_ins.enabled and connection != \ dandelion_ins.objectChildStem(hashId): continue @@ -52,7 +54,7 @@ def run(self): # pylint: disable=too-many-branches chunk = [] while True: # Dandelion fluff trigger by expiration - handleExpiredDandelion(dandelion_ins.expire(invQueue)) + handleExpiredDandelion(dandelion_ins.expire(invQueue), self.pool) try: data = invQueue.get(False) chunk.append((data[0], data[1])) @@ -63,7 +65,7 @@ def run(self): # pylint: disable=too-many-branches break if chunk: - for connection in connectionpool.pool.connections(): + for connection in self.pool.connections(): fluffs = [] stems = [] for inv in chunk: diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 510cd7fa6..8a9303697 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -2,7 +2,6 @@ A thread to handle network concerns """ import network.asyncore_pollchoose as asyncore -from . import connectionpool from queues import excQueue from .threads import StoppableThread @@ -11,27 +10,31 @@ class BMNetworkThread(StoppableThread): """Main network thread""" name = "Asyncore" + def __init__(self, pool): + super(BMNetworkThread, self).__init__() + self.pool = pool + def run(self): try: while not self._stopped: - connectionpool.pool.loop() + self.pool.loop() except Exception as e: excQueue.put((self.name, e)) raise def stopThread(self): super(BMNetworkThread, self).stopThread() - for i in connectionpool.pool.listeningSockets.values(): + for i in self.pool.listeningSockets.values(): try: i.close() except: # nosec B110 # pylint:disable=bare-except pass - for i in connectionpool.pool.outboundConnections.values(): + for i in self.pool.outboundConnections.values(): try: i.close() except: # nosec B110 # pylint:disable=bare-except pass - for i in connectionpool.pool.inboundConnections.values(): + for i in self.pool.inboundConnections.values(): try: i.close() except: # nosec B110 # pylint:disable=bare-except diff --git a/src/network/objectracker.py b/src/network/objectracker.py index d40ebd309..57e928935 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -4,7 +4,6 @@ import time from threading import RLock -from . import connectionpool from network import dandelion_ins from randomtrackingdict import RandomTrackingDict @@ -100,7 +99,7 @@ def handleReceivedInventory(self, hashId): def handleReceivedObject(self, streamNumber, hashid): """Handling received object""" - for i in connectionpool.pool.connections(): + for i in self.pool.connections(): # pylint: disable=no-member if not i.fullyEstablished: continue try: diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index bd642d9fb..6fd1dc601 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -6,7 +6,6 @@ from six.moves import queue -from . import connectionpool from network.advanceddispatcher import UnknownStateError from network import receiveDataQueue from .threads import StoppableThread @@ -15,8 +14,9 @@ class ReceiveQueueThread(StoppableThread): """This thread processes data received from the network (which is done by the asyncore thread)""" - def __init__(self, num=0): + def __init__(self, num, pool): super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) + self.pool = pool def run(self): while not self._stopped: @@ -36,7 +36,7 @@ def run(self): # enough data, or the connection is to be aborted try: - connection = connectionpool.pool.getConnectionByAddr(dest) + connection = self.pool.getConnectionByAddr(dest) # connection object not found except KeyError: receiveDataQueue.task_done() diff --git a/src/network/stats.py b/src/network/stats.py index 398644a3b..b1fd5ea5d 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -4,7 +4,6 @@ import time from . import asyncore_pollchoose as asyncore -from . import connectionpool from .objectracker import missingObjects @@ -15,12 +14,20 @@ lastSentBytes = 0 currentSentSpeed = 0 +pool = None + + +def init(pool_instance): + """Set the pool reference for stats functions""" + global pool # pylint: disable=global-statement + pool = pool_instance + def connectedHostsList(): """List of all the connected hosts""" - if connectionpool.pool is None: + if pool is None: return [] - return connectionpool.pool.establishedConnections() + return pool.establishedConnections() def sentBytes(): diff --git a/src/network/tcp.py b/src/network/tcp.py index db9c91126..50b6ad6da 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -14,7 +14,6 @@ import l10n import protocol import state -from . import connectionpool from bmconfigparser import config from highlevelcrypto import randomBytes from network import dandelion_ins, invQueue, receiveDataQueue @@ -268,7 +267,7 @@ def handle_connect(self): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, dandelion_ins.enabled, + self.pool.streams, dandelion_ins.enabled, False, nodeid=self.nodeid)) self.connectedAt = time.time() receiveDataQueue.put(self.destination) @@ -319,7 +318,7 @@ def state_proxy_handshake_done(self): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, dandelion_ins.enabled, + self.pool.streams, dandelion_ins.enabled, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -343,7 +342,7 @@ def state_proxy_handshake_done(self): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, dandelion_ins.enabled, + self.pool.streams, dandelion_ins.enabled, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -431,7 +430,7 @@ def handle_accept(self): state.ownAddresses[Peer(*sock.getsockname())] = True if ( - len(connectionpool.pool) + len(self.pool) > config.safeGetInt( 'bitmessagesettings', 'maxtotalconnections') + config.safeGetInt( @@ -443,7 +442,7 @@ def handle_accept(self): sock.close() return try: - connectionpool.pool.addConnection( + self.pool.addConnection( TCPConnection(sock=sock)) except socket.error: pass diff --git a/src/network/udp.py b/src/network/udp.py index 4a5df79b6..e48481bb0 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -8,7 +8,6 @@ # magic imports! import protocol import state -from . import connectionpool from network import receiveDataQueue from .bmproto import BMProto @@ -82,7 +81,7 @@ def bm_command_addr(self): remoteport = False for seenTime, stream, _, ip, port in addresses: decodedIP = protocol.checkIPAddress(str(ip)) - if stream not in connectionpool.pool.streams: + if stream not in self.pool.streams: continue if (seenTime < time.time() - protocol.MAX_TIME_OFFSET or seenTime > time.time() + protocol.MAX_TIME_OFFSET): diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 6d23dcec8..d493aa910 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -6,7 +6,6 @@ import random import protocol import state -from . import connectionpool from randomtrackingdict import RandomTrackingDict from network import dandelion_ins from .threads import StoppableThread @@ -19,11 +18,15 @@ class UploadThread(StoppableThread): maxBufSize = 2097152 # 2MB name = "Uploader" + def __init__(self, pool): + super(UploadThread, self).__init__() + self.pool = pool + def run(self): while not self._stopped: uploaded = 0 # Choose uploading peers randomly - connections = connectionpool.pool.establishedConnections() + connections = self.pool.establishedConnections() random.shuffle(connections) for i in connections: now = time.time()