Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@
sqlStoredProcedure)
from highlevelcrypto import calculateInventoryHash

try:
from network import connectionpool
except ImportError:
connectionpool = None

from network import StoppableThread, invQueue, stats
from version import softwareVersion

Expand Down Expand Up @@ -1551,18 +1546,18 @@ def HandleListConnections(self):
Returns bitmessage connection information as dict with keys *inbound*,
*outbound*.
"""
if connectionpool is None or connectionpool.pool is None:
if stats.pool is None:
raise APIError(21, 'Network is not started.')
inboundConnections = []
outboundConnections = []
for i in connectionpool.pool.inboundConnections.values():
for i in stats.pool.inboundConnections.values():
inboundConnections.append({
'host': i.destination.host,
'port': i.destination.port,
'fullyEstablished': i.fullyEstablished,
'userAgent': str(i.userAgent)
})
for i in connectionpool.pool.outboundConnections.values():
for i in stats.pool.outboundConnections.values():
outboundConnections.append({
'host': i.destination.host,
'port': i.destination.port,
Expand Down
2 changes: 1 addition & 1 deletion src/bitmessageqt/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def accept(self):
'bitmessagesettings', 'udp'):
self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
if udp_enabled:
announceThread = AnnounceThread()
announceThread = AnnounceThread(connectionpool.pool)
announceThread.daemon = True
announceThread.start()
else:
Expand Down
21 changes: 13 additions & 8 deletions src/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
__all__ = ["StoppableThread"]


def start(config, state):
def start(config, state): # pylint: disable=too-many-locals
"""Start network threads"""
from .announcethread import AnnounceThread
from . import connectionpool
Expand All @@ -29,30 +29,35 @@ def start(config, state):
from .knownnodes import readKnownNodes
from .receivequeuethread import ReceiveQueueThread
from .uploadthread import UploadThread
from . import stats

# create the connection pool
connectionpool.pool = BMConnectionPool()
pool = BMConnectionPool()
connectionpool.pool = pool

# check and set dandelion enabled value at network startup
dandelion_ins.init_dandelion_enabled(config)
# pass pool instance into dandelion class instance
dandelion_ins.init_pool(connectionpool.pool)
dandelion_ins.init_pool(pool)

# init stats with pool reference
stats.init(pool)

readKnownNodes()
connectionpool.pool.connectToStream(1)
pool.connectToStream(1)
for thread in (
BMNetworkThread(), InvThread(), AddrThread(),
DownloadThread(), UploadThread()
BMNetworkThread(pool), InvThread(pool), AddrThread(pool),
DownloadThread(pool), UploadThread(pool)
):
thread.daemon = True
thread.start()

# Optional components
for i in range(config.getint('threads', 'receive')):
thread = ReceiveQueueThread(i)
thread = ReceiveQueueThread(i, pool)
thread.daemon = True
thread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread()
state.announceThread = AnnounceThread(pool)
state.announceThread.daemon = True
state.announceThread.start()
7 changes: 5 additions & 2 deletions src/network/addrthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import random
from six.moves import queue

from . import connectionpool
from protocol import assembleAddrMessage
from network import addrQueue # FIXME: init with queue

Expand All @@ -15,6 +14,10 @@ class AddrThread(StoppableThread):
"""(Node) address broadcasting thread"""
name = "AddrBroadcaster"

def __init__(self, pool):
super(AddrThread, self).__init__()
self.pool = pool

def run(self):
while not self._stopped:
chunk = []
Expand All @@ -27,7 +30,7 @@ def run(self):

if chunk:
# Choose peers randomly
connections = connectionpool.pool.establishedConnections()
connections = self.pool.establishedConnections()
random.shuffle(connections)
for i in connections:
random.shuffle(chunk)
Expand Down
12 changes: 7 additions & 5 deletions src/network/announcethread.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""
import time

from . import connectionpool
from bmconfigparser import config
from protocol import assembleAddrMessage

Expand All @@ -16,6 +15,10 @@ class AnnounceThread(StoppableThread):
name = "Announcer"
announceInterval = 60

def __init__(self, pool):
super(AnnounceThread, self).__init__()
self.pool = pool

def run(self):
lastSelfAnnounced = 0
while not self._stopped:
Expand All @@ -26,13 +29,12 @@ def run(self):
if processed == 0:
self.stop.wait(10)

@staticmethod
def announceSelf():
def announceSelf(self):
"""Announce our presence"""
for connection in connectionpool.pool.udpSockets.values():
for connection in self.pool.udpSockets.values():
if not connection.announcing:
continue
for stream in connectionpool.pool.streams:
for stream in self.pool.streams:
addr = (
stream,
Peer(
Expand Down
32 changes: 16 additions & 16 deletions src/network/asyncore_pollchoose.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def __init__(self, sock=None, map=None):
# Set to nonblocking just to make sure for cases where we
# get a socket from a blocking source.
sock.setblocking(0)
self.set_socket(sock, map)
self.set_socket(sock)
self.connected = True
# The constructor no longer requires that the socket
# passed be connected.
Expand All @@ -524,7 +524,7 @@ def __init__(self, sock=None, map=None):
# The socket is broken in some unknown way, alert
# the user and remove it from the map (to prevent
# polling of broken sockets).
self.del_channel(map)
self.del_channel()
raise
else:
self.socket = None
Expand All @@ -544,22 +544,18 @@ def __repr__(self):

__str__ = __repr__

def add_channel(self, map=None):
"""Add a channel"""
def add_channel(self):
"""Add a channel to the asyncore map set during __init__."""
# pylint: disable=attribute-defined-outside-init
if map is None:
map = self._map
map[self._fileno] = self
self._map[self._fileno] = self
self.poller_flags = 0
self.poller_filter = 0

def del_channel(self, map=None):
"""Delete a channel"""
def del_channel(self):
"""Delete a channel from the asyncore map set during __init__."""
fd = self._fileno
if map is None:
map = self._map
if fd in map:
del map[fd]
if fd in self._map:
del self._map[fd]
if self._fileno:
try:
kqueue_poller.pollster.control([select.kevent(
Expand Down Expand Up @@ -595,11 +591,15 @@ def create_socket(
sock.setblocking(0)
self.set_socket(sock)

def set_socket(self, sock, map=None):
"""Set socket"""
def set_socket(self, sock):
"""Set socket without registering in the asyncore map.

Registration is deferred to an explicit add_channel() call,
typically done by BMConnectionPool.addConnection() after the
connection is fully initialised with a pool reference.
"""
self.socket = sock
self._fileno = sock.fileno()
self.add_channel(map)

def set_reuse_addr(self):
"""try to re-use a server port if possible"""
Expand Down
5 changes: 2 additions & 3 deletions src/network/bmobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import protocol
import state
from . import connectionpool
from network import dandelion_ins
from highlevelcrypto import calculateInventoryHash

Expand Down Expand Up @@ -93,14 +92,14 @@ def checkEOLSanity(self):
# .. todo:: remove from download queue
raise BMObjectExpiredError()

def checkStream(self):
def checkStream(self, streams):
"""Check if object's stream matches streams we are interested in"""
if self.streamNumber < protocol.MIN_VALID_STREAM \
or self.streamNumber > protocol.MAX_VALID_STREAM:
logger.warning(
'The object has invalid stream: %s', self.streamNumber)
raise BMObjectInvalidError()
if self.streamNumber not in connectionpool.pool.streams:
if self.streamNumber not in streams:
logger.debug(
'The streamNumber %i isn\'t one we are interested in.',
self.streamNumber)
Expand Down
28 changes: 13 additions & 15 deletions src/network/bmproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import addresses
import protocol
import state
from . import connectionpool
from . import knownnodes
from bmconfigparser import config
from queues import objectProcessorQueue
Expand Down Expand Up @@ -395,27 +394,27 @@ def bm_command_object(self):
self.object.checkAlreadyHave()
except (BMObjectExpiredError, BMObjectAlreadyHaveError,
BMObjectInsufficientPOWError):
BMProto.stopDownloadingObject(self.object.inventoryHash)
self.stopDownloadingObject(self.object.inventoryHash)
raise
try:
self.object.checkStream()
self.object.checkStream(self.pool.streams)
except BMObjectUnwantedStreamError:
acceptmismatch = config.getboolean(
"inventory", "acceptmismatch")
BMProto.stopDownloadingObject(
self.stopDownloadingObject(
self.object.inventoryHash, acceptmismatch)
if not acceptmismatch:
raise
except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash)
self.stopDownloadingObject(self.object.inventoryHash)
raise

try:
self.object.checkObjectByType()
objectProcessorQueue.put((
self.object.objectType, buffer(self.object.data))) # noqa: F821
except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
self.stopDownloadingObject(self.object.inventoryHash, True)
else:
try:
del missingObjects[self.object.inventoryHash]
Expand Down Expand Up @@ -448,7 +447,7 @@ def bm_command_addr(self):
for seenTime, stream, _, ip, port in self._decode_addr():
ip = str(ip)
if (
stream not in connectionpool.pool.streams
stream not in self.pool.streams
# FIXME: should check against complete list
or ip.startswith('bootstrap')
):
Expand Down Expand Up @@ -547,7 +546,7 @@ def bm_command_version(self):
if not self.isOutbound:
self.append_write_buf(protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.pool.streams, dandelion_ins.enabled, True,
self.pool.streams, dandelion_ins.enabled, True,
nodeid=self.nodeid))
logger.debug(
'%(host)s:%(port)i sending version',
Expand Down Expand Up @@ -603,7 +602,7 @@ def peerValidityChecks(self):
'Closed connection to %s because there is no overlapping'
' interest in streams.', self.destination)
return False
if connectionpool.pool.inboundConnections.get(
if self.pool.inboundConnections.get(
self.destination):
try:
if not protocol.checkSocksIP(self.destination.host):
Expand All @@ -621,8 +620,8 @@ def peerValidityChecks(self):
# or server full report the same error to counter deanonymisation
if (
Peer(self.destination.host, self.peerNode.port)
in connectionpool.pool.inboundConnections
or len(connectionpool.pool)
in self.pool.inboundConnections
or len(self.pool)
> config.safeGetInt(
'bitmessagesettings', 'maxtotalconnections')
+ config.safeGetInt(
Expand All @@ -634,7 +633,7 @@ def peerValidityChecks(self):
'Closed connection to %s due to server full'
' or duplicate inbound/outbound.', self.destination)
return False
if connectionpool.pool.isAlreadyConnected(self.nonce):
if self.pool.isAlreadyConnected(self.nonce):
self.append_write_buf(protocol.assembleErrorMessage(
errorText="I'm connected to myself. Closing connection.",
fatal=2))
Expand All @@ -645,10 +644,9 @@ def peerValidityChecks(self):

return True

@staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False):
def stopDownloadingObject(self, hashId, forwardAnyway=False):
"""Stop downloading object *hashId*"""
for connection in connectionpool.pool.connections():
for connection in self.pool.connections():
try:
del connection.objectsNewToMe[hashId]
except KeyError:
Expand Down
6 changes: 6 additions & 0 deletions src/network/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def isAlreadyConnected(self, nodeid):
def addConnection(self, connection):
"""Add a connection object to our internal dict"""
from .udp import UDPSocket
connection.pool = self
connection.add_channel()
if isinstance(connection, UDPSocket):
return
if connection.isOutbound:
Expand Down Expand Up @@ -182,6 +184,8 @@ def startListening(self, bind=None):
port = config.safeGetInt("bitmessagesettings", "port")
# correct port even if it changed
ls = TCPServer(host=bind, port=port)
ls.pool = self
ls.add_channel()
self.listeningSockets[ls.destination] = ls

def startUDPSocket(self, bind=None):
Expand All @@ -198,6 +202,8 @@ def startUDPSocket(self, bind=None):
udpSocket = UDPSocket(announcing=False)
else:
udpSocket = UDPSocket(host=bind, announcing=True)
udpSocket.pool = self
udpSocket.add_channel()
self.udpSockets[udpSocket.listening.host] = udpSocket

def startBootstrappers(self):
Expand Down
Loading