Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions gnupg/_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,20 +744,30 @@ def _handle_io(self, args, file, result, passphrase=False, binary=False):
self._collect_output(p, result, writer, stdin)
return result

def _recv_keys(self, keyids, keyserver=None):
def _recv_keys(self, keyids, keyserver=None, keyserver_certs=None):
"""Import keys from a keyserver.

:param str keyids: A space-delimited string containing the keyids to
request.
:param str keyserver: The keyserver to request the ``keyids`` from;
defaults to `gnupg.GPG.keyserver`.
:param str keyserver_certs: A file passed as the CA cert file for the
keyserver.
"""
if not keyserver:
keyserver = self.keyserver

args = ['--keyserver {0}'.format(keyserver),
'--recv-keys {0}'.format(keyids)]
log.info('Requesting keys from %s: %s' % (keyserver, keyids))
if keyserver_certs:
args.insert(0, '--keyserver-options ca-cert-file={0}'.format(
keyserver_certs)
)
log.info('Requesting keys from %s (using CA file %s): %s' % (
keyserver,keyserver_certs, keyids)
)
print('%s args: %s' % (self.binary, ' '.join(args)))
print('make_args: %s' % (self._make_args(args, False)))

result = self._result_map['import'](self)
proc = self._open_subprocess(args)
Expand Down Expand Up @@ -1029,7 +1039,7 @@ def _encrypt(self, data, recipients,
log.info("Encrypted output written successfully.")

return result

def _add_recipient_string(self, args, hidden_recipients, recipient):
if isinstance(hidden_recipients, (list, tuple)):
if [s for s in hidden_recipients if recipient in str(s)]:
Expand Down
78 changes: 78 additions & 0 deletions gnupg/_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
except ImportError:
from ordereddict import OrderedDict

try:
import urlparse
except ImportError:
from urllib import parse as urlparse

import os
import re

from . import _util
Expand Down Expand Up @@ -74,6 +80,67 @@ def _check_keyserver(location):
return keyserver
return None


def _check_keyserver_option(ks_option):
"""Check that the provided keyserver option is valid and safe.

:param str ks_option: A valid argument to --keyserver-option.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you renamed the parameter to option_value?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nope, I'm sorry. I appear to be used to C and Rust where the docstrings go above the function they document. Carry on!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, no worries! I've solidly moved into Go these days so who knows what I'm going to write in during this merge :)

:rtype: :obj:`str` or :obj:`None`
:returns: A string of the keyserver option or None.
"""
def _is_valid_file(option_value):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to use gnupg._util._is_file() here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing.

"""Verify option value is a file."""
return os.path.isfile(option_value)

def _is_valid_integer(option_value):
"""Verify option value is an integer."""
return str.isdigit(option_value)

def _is_valid_http_proxy(option_value):
"""Verify option value looks like a proxy URL."""
if not (option_value.startswith('http://') or
option_value.startswith('https://')):
proxy = 'http://{0}'.format(option_value)
else:
proxy = option_value
parsed_url = urlparse.urlparse(proxy)
if parsed_url.scheme and parsed_url.hostname:
return True
else:
return False

boolean_options = {
'auto-key-retrieve',
'check-cert',
'honor-keyserver-url',
'honor-pka-record',
'keep-temp-files',
'include-disabled',
'include-revoked',
'include-subkeys',
'use-temp-files',
}
no_prefixed_options = set(['no{}'.format(opt) for opt in boolean_options])
options_with_validators = {
'ca-cert-file': _is_valid_file,
'http-proxy': _is_valid_http_proxy,
'max-cert-size': _is_valid_integer,
'timeout': _is_valid_integer,
}
valid_simple_options = (set(['debug', 'verbose']) |
boolean_options |
no_prefixed_options)
if ks_option in valid_simple_options:
return ks_option
opt, opt_arg = ks_option.split('=', 1)
if opt in options_with_validators:
arg_ok = options_with_validators[opt](opt_arg)
if arg_ok:
return ks_option
log.debug('Dropping invalid keyserver option: {}'.format(ks_option))
return None


def _check_preferences(prefs, pref_type=None):
"""Check cipher, digest, and compression preference settings.

Expand Down Expand Up @@ -328,6 +395,16 @@ def _check_option(arg, value):
checked += (v + " ")
else: log.debug("Dropping keyserver: %s" % v)
continue
elif flag in ['--keyserver-options']:
print('found keyserver options: %s' % v)
keyserver_option = _check_keyserver_option(v)
if keyserver_option:
log.debug('Setting keyserver option: %s' %
keyserver_option)
checked += (keyserver_option + " ")
else:
log.debug('Dropping keyserver option: %s' % v)
continue

## the rest are strings, filenames, etc, and should be
## shell escaped:
Expand Down Expand Up @@ -496,6 +573,7 @@ def _get_options_group(group=None):
#: These have their own parsers and don't really fit into a group
other_options = frozenset(['--debug-level',
'--keyserver',
'--keyserver-options',

])
#: These should have a directory for an argument
Expand Down
11 changes: 10 additions & 1 deletion gnupg/gnupg.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,22 @@ def recv_keys(self, *keyids, **kwargs):
"""Import keys from a keyserver.

>>> gpg = gnupg.GPG(homedir="doctests")
>>> key = gpg.recv_keys('hkp://pgp.mit.edu', '3FF0DB166A7476EA')
>>> key = gpg.recv_keys('3FF0DB166A7476EA',
keyserver='hkp://pgp.mit.edu')
>>> assert key

>>> ssl_keyserver = 'hkps://hkps.pool.sks-keyservers.net'
>>> ca_cert = '/home/user/hkps.pool.sks-keyservers.netCA.pem'
>>> gpg.recv_keys('6F682D87',
keyserver=ssl_keyserver,
keyserver_certs=ca_cert)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need >>> on these two lines to be able to run them as a doctest.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding.


:param str keyids: Each ``keyids`` argument should be a string
containing a keyid to request.
:param str keyserver: The keyserver to request the ``keyids`` from;
defaults to `gnupg.GPG.keyserver`.
:param str keyserver_certs: A file passed as the CA cert file for the
keyserver.
"""
if keyids:
keys = ' '.join([key for key in keyids])
Expand Down