Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 3 additions & 115 deletions src/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,115 +85,14 @@ def __init__(self, ident, hostname, display_hostname, ip_info, port, local_ident

def start_remote_thread(self):
# func = lambda: return

if self.api_version == "1":
func = self.remote_thread_v1
elif self.api_version == "2":
func = self.remote_thread_v2
func = self.remote_thread_v2

self.remote_thread = threading.Thread(target=func, name="remote-main-thread-v%s-%s-%s:%d-%s"
% (self.api_version, self.hostname, self.ip_info.ip4_address, self.port, self.ident))
# logging.debug("remote-thread-%s-%s:%d-%s"
# % (self.hostname, self.ip_info.ip4_address, self.port, self.ident))
self.remote_thread.start()

def remote_thread_v1(self):
self.ping_timer.clear()

self.emit_machine_info_changed() # Let's make sure the button doesn't have junk in it if we fail to connect.

logging.debug("Remote: Attempting to connect to %s (%s) - api version 1" % (self.display_hostname, self.ip_info.ip4_address))

self.set_remote_status(RemoteStatus.INIT_CONNECTING)

def run_secure_loop():
logging.debug("Remote: Starting a new connection loop for %s (%s:%d)"
% (self.display_hostname, self.ip_info, self.port))

cert = auth.get_singleton().get_cached_cert(self.hostname, self.ip_info)
creds = grpc.ssl_channel_credentials(cert)

with grpc.secure_channel("%s:%d" % (self.ip_info.ip4_address, self.port), creds) as channel:
future = grpc.channel_ready_future(channel)

try:
future.result(timeout=4)
self.stub = warp_pb2_grpc.WarpStub(channel)
except grpc.FutureTimeoutError:
self.set_remote_status(RemoteStatus.UNREACHABLE)
future.cancel()

if not self.ping_timer.is_set():
logging.debug("Remote: Unable to establish secure connection with %s (%s:%d). Trying again in %ds"
% (self.display_hostname, self.ip_info, self.port, CHANNEL_RETRY_WAIT_TIME))
self.ping_timer.wait(CHANNEL_RETRY_WAIT_TIME)
return True # run_secure_loop()

return False # run_secure_loop()

duplex_fail_counter = 0
one_ping = False # A successful duplex response lets us finish setting things up.

while not self.ping_timer.is_set():

if self.busy:
logging.debug("Remote Ping: Skipping keepalive ping to %s (%s:%d) (busy)"
% (self.display_hostname, self.ip_info, self.port))
self.busy = False
else:
try:
# t = GLib.get_monotonic_time()
logging.debug("Remote Ping: to %s (%s:%d)"
% (self.display_hostname, self.ip_info, self.port))
self.stub.Ping(warp_pb2.LookupName(id=self.local_ident,
readable_name=util.get_hostname()),
timeout=5)
# logging.debug("Latency: %s (%s)"
# % (util.precise_format_time_span(GLib.get_monotonic_time() - t), self.display_hostname))
if not one_ping:
self.set_remote_status(RemoteStatus.AWAITING_DUPLEX)
if self.check_duplex_connection():
logging.debug("Remote: Connected to %s (%s:%d)"
% (self.display_hostname, self.ip_info, self.port))

self.set_remote_status(RemoteStatus.ONLINE)

self.rpc_call(self.update_remote_machine_info)
self.rpc_call(self.update_remote_machine_avatar)
one_ping = True
else:
duplex_fail_counter += 1
if duplex_fail_counter > DUPLEX_MAX_FAILURES:
logging.debug("Remote: CheckDuplexConnection to %s (%s:%d) failed too many times"
% (self.display_hostname, self.ip_info, self.port))
self.ping_timer.wait(CHANNEL_RETRY_WAIT_TIME)
return True
except grpc.RpcError as e:
logging.debug("Remote: Ping failed, shutting down %s (%s:%d)"
% (self.display_hostname, self.ip_info, self.port))
break

self.ping_timer.wait(CONNECTED_PING_TIME if self.status == RemoteStatus.ONLINE else DUPLEX_WAIT_PING_TIME)

# This is reached by the RpcError break above. If the remote is still discoverable, start
# the secure loop over. This could have happened as a result of a quick disco/reconnect,
# And we don't notice until it has already come back. In this case, try a new connection.
if self.has_zc_presence and not self.ping_timer.is_set():
return True # run_secure_loop()

# The ping timer has been triggered, this is an orderly shutdown.
return False # run_secure_loop()

try:
while run_secure_loop():
continue
except Exception as e:
logging.critical("!! Major problem starting connection loop for %s (%s:%d): %s"
% (self.display_hostname, self.ip_info, self.port, e))

self.set_remote_status(RemoteStatus.OFFLINE)
self.run_thread_alive = False

def remote_thread_v2(self):
self.channel_keepalive.clear()

Expand Down Expand Up @@ -275,10 +174,8 @@ def channel_state_changed(state):
self.set_remote_status(RemoteStatus.OFFLINE)

def shutdown(self):
if self.api_version == "1":
self.ping_timer.set()
else:
self.channel_keepalive.set()
# api v2
self.channel_keepalive.set()
# This is called by server just before running start_remote_thread, so the first time
# self.remote_thread will be None.
try:
Expand Down Expand Up @@ -343,15 +240,6 @@ def rpc_call(self, func, *args, **kargs):
logging.critical("!! RPC threadpool failure while submitting call to %s (%s:%d): %s"
% (self.display_hostname, self.ip_info, self.port, e))

# Not added to thread pool
def check_duplex_connection(self):
logging.debug("Remote: checking duplex with '%s'" % self.display_hostname)

ret = self.stub.CheckDuplexConnection(warp_pb2.LookupName(id=self.local_ident,
readable_name=util.get_hostname()))

return ret.response

def wait_for_duplex(self):
logging.debug("Remote: waiting for duplex from '%s'" % self.display_hostname)

Expand Down
129 changes: 2 additions & 127 deletions src/remote_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ def __init__(self, ident, hostname, ip_info, port, auth_port, api_version):
self.ip_info = ip_info
self.port = port

#v1 only
self.request = None

#v2 only
self.auth_port = auth_port
self.locked_cert = None
Expand All @@ -36,7 +33,6 @@ def cancel(self):

class Registrar():
def __init__(self, ip_info, port, auth_port):
self.reg_server_v1 = None
self.reg_server_v2 = None
self.active_registrations = {}
self.reg_lock = threading.Lock()
Expand All @@ -48,15 +44,10 @@ def __init__(self, ip_info, port, auth_port):
self.start_registration_servers()

def start_registration_servers(self):
if self.reg_server_v1 is not None:
self.reg_server_v1.stop()

if self.reg_server_v2 is not None:
self.reg_server_v2.stop(grace=2).wait()
self.reg_server_v2 = None

logging.debug("Starting v1 registration server (%s) with port %d" % (self.ip_info, self.port))
self.reg_server_v1 = RegistrationServer_v1(self.ip_info, self.port)
logging.debug("Starting v2 registration server (%s) with auth port %d" % (self.ip_info, self.auth_port))
self.reg_server_v2 = RegistrationServer_v2(self.ip_info, self.auth_port)

Expand All @@ -66,11 +57,6 @@ def shutdown_registration_servers(self):
self.active_registrations[key].cancel()
self.active_registrations = {}

if self.reg_server_v1 is not None:
logging.debug("Stopping v1 registration server.")
self.reg_server_v1.stop()
self.reg_server_v1 = None

if self.reg_server_v2:
logging.debug("Stopping v2 registration server.")
self.reg_server_v2.stop()
Expand All @@ -81,12 +67,8 @@ def register(self, ident, hostname, ip_info, port, auth_port, api_version):
with self.reg_lock:
self.active_registrations[ident] = details

ret = None

if api_version == "1":
ret = register_v1(details)
elif api_version == "2":
ret = register_v2(details)
# api v2
ret = register_v2(details)

with self.reg_lock:
# shutdown_registration_servers may have been called on a different thread.
Expand All @@ -97,113 +79,6 @@ def register(self, ident, hostname, ip_info, port, auth_port, api_version):

return ret

####################### api v1

def register_v1(details):
# This will block if the remote's warp udp port is closed, until either the port is unblocked
# or we tell the auth object to shutdown, in which case the request timer will cancel and return
# here immediately (with None)

logging.debug("Registering with %s (%s:%d) - api version 1" % (details.hostname, details.ip_info, details.port))

success = retrieve_remote_cert(details)

if success == util.CertProcessingResult.FAILURE:
logging.debug("Unable to register with %s (%s:%d) - api version 1"
% (details.hostname, details.ip_info, details.port))
return False

return True

def retrieve_remote_cert(details):
logging.debug("Auth: Starting a new RequestLoop for '%s' (%s:%d)" % (details.hostname, details.ip_info, details.port))

details.request = Request(details.ip_info, details.port)
data = details.request.request()

if data is None or details.cancelled:
return util.CertProcessingResult.FAILURE

return auth.get_singleton().process_remote_cert(details.hostname,
details.ip_info,
data)

REQUEST = b"REQUEST"

#v1 client
class Request():
def __init__(self, ip_info, port):
self.ip_info = ip_info
self.port = port

def request(self):
logging.debug("Auth: Requesting cert from remote (%s:%d)" % (self.ip_info, self.port))

remote_ip, _, ip_version = self.ip_info.get_usable_ip()

try:
ip = remote_ip if ip_version == socket.AF_INET else "[%s]" % (remote_ip,)
server_sock = socket.socket(ip_version, socket.SOCK_DGRAM)
server_sock.settimeout(5.0)
server_sock.sendto(REQUEST, (ip, self.port))

reply, addr = server_sock.recvfrom(2000)

if addr == (remote_ip, self.port):
return reply
except socket.timeout:
logging.debug("Auth: Cert request failed from remote (%s:%d) - (Is their udp port blocked?"
% (self.ip_info, self.port))
except socket.error as e:
logging.critical("Something wrong with cert request (%s:%s): " % (remote_ip, self.port, e))

return None

# v1 server
class RegistrationServer_v1():
def __init__(self, ip_info, port):
self.exit = False
self.ip_info = ip_info
self.port = port

self.thread4 = threading.Thread(target=self.serve_cert_thread, args=(socket.AF_INET,))
self.thread6 = threading.Thread(target=self.serve_cert_thread, args=(socket.AF_INET6,))
self.thread4.start()
self.thread6.start()

def serve_cert_thread(self, ip_version):
local_ip = None
if ip_version == socket.AF_INET:
local_ip = self.ip_info.ip4_address
elif ip_version == socket.AF_INET6:
local_ip = self.ip_info.ip6_address

if local_ip is not None:
try:
server_sock = socket.socket(ip_version, socket.SOCK_DGRAM)
server_sock.settimeout(1.0)
server_sock.bind((local_ip, self.port))
except socket.error as e:
logging.critical("Could not create udp socket for cert requests: %s" % str(e))
return

while True:
try:
data, address = server_sock.recvfrom(2000)

if data == REQUEST:
cert_data = auth.get_singleton().get_encoded_local_cert()
server_sock.sendto(cert_data, address)
except socket.timeout as e:
if self.exit:
server_sock.close()
break

def stop(self):
self.exit = True
self.thread4.join()
self.thread6.join()


####################### api v2

Expand Down
16 changes: 2 additions & 14 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ def add_service(self, zeroconf, _type, name):
api_version = info.properties[b"api-version"].decode()
auth_port = int(info.properties[b"auth-port"].decode())
except KeyError:
api_version = "1"
auth_port = 0
logging.debug(">>> Discovery: registration API v1 is not supported anymore, ignoring: %s (%s)" % (remote_hostname, remote_ip_info))
return

# FIXME: I'm not sure why we still get discovered by other networks in some cases -
# The Zeroconf object has a specific ip it is set to, what more do I need to do?
Expand Down Expand Up @@ -519,18 +519,6 @@ def Ping(self, request, context):

return void

def CheckDuplexConnection(self, request, context):
logging.debug("Server RPC: CheckDuplexConnection from '%s'" % request.readable_name)
response = False

try:
remote = self.remote_machines[request.id]
response = (remote.status in (RemoteStatus.AWAITING_DUPLEX, RemoteStatus.ONLINE))
except KeyError:
pass

return warp_pb2.HaveDuplex(response=response)

def WaitingForDuplex(self, request, context):
logging.debug("Server RPC: WaitingForDuplex from '%s' (api v2)" % request.readable_name)

Expand Down
2 changes: 0 additions & 2 deletions src/warp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ syntax = "proto3";

service Warp {
// Sender methods
// api v1 duplex method (ping style)
rpc CheckDuplexConnection(LookupName) returns (HaveDuplex) {}
// api v2 duplex method (block/future)
rpc WaitingForDuplex(LookupName) returns (HaveDuplex) {}

Expand Down
Loading
Loading