1414"""Compliance credentials configuration."""
1515
1616import logging
17+ import os
18+ import shlex
19+ import subprocess # nosec B404
1720from collections import OrderedDict , namedtuple
1821from configparser import RawConfigParser
1922from os import environ
2730
2831
2932class OnePasswordBackend :
30- pass
33+ def __init__ (self , ** kwargs ):
34+ self ._url = kwargs .get ("url" )
35+ self ._vault = kwargs .get ("vault" , "auditree" )
36+
37+ def get_section (self , section ):
38+ cmd = f"op item get --vault { self ._vault } --format json { section } "
39+ subprocess .run ( # nosec B603
40+ shlex .split (cmd ),
41+ env = os .environ ,
42+ stdout = subprocess .PIPE ,
43+ stderr = subprocess .PIPE ,
44+ check = True ,
45+ )
46+
47+ def attribute_error_msg (self , section , attr ):
48+ return (
49+ f'Unable to locate field "{ attr } " '
50+ f'in secure note "{ section } " '
51+ f'at 1Password vault "{ self ._vault } "'
52+ )
3153
3254
3355class ConfigParserBackend :
@@ -36,6 +58,21 @@ def __init__(self, **kwargs):
3658 self ._cfg_file = kwargs .get ("cfg_file" )
3759 self ._cfg .read (str (Path (self ._cfg_file ).expanduser ()))
3860
61+ def get_section (self , section ):
62+ params = []
63+ values = []
64+ if self ._cfg .has_section (section ):
65+ params = self ._cfg .options (section )
66+ values = [self ._cfg .get (section , x ) for x in self ._cfg .options (section )]
67+ return OrderedDict (zip (params , values ))
68+
69+ def attribute_error_msg (self , section , attr ):
70+ return (
71+ f'Unable to locate attribute "{ attr } " '
72+ f'in section "{ section } " '
73+ f'at config file "{ self ._cfg_file } "'
74+ )
75+
3976
4077class Config :
4178 """Handle credentials configuration."""
@@ -46,14 +83,20 @@ def __init__(self, cfg_file="~/.credentials", backend_cfg=None):
4683 """
4784 Create an instance of a dictionary-like configuration object.
4885
49- :param cfg_file: The path to the RawConfigParser compatible config file
86+ :param cfg_file: The path to a config file for building a ConfigParserBackend.
87+ :param backend_cfg: A dictionary with the backend config
5088 """
5189
5290 if backend_cfg is None :
5391 backend_cfg = {"name" : "configparser" , "cfg_file" : cfg_file }
5492 self ._init_backend (backend_cfg )
5593
5694 def _init_backend (self , backend_cfg ):
95+ """
96+ Create an instance of a dictionary-like configuration object.
97+
98+ :param cfg_file: The path to the RawConfigParser compatible config file
99+ """
57100 name = backend_cfg .get ("name" )
58101 if backend_cfg .get ("name" ) not in Config .BACKENDS :
59102 raise ValueError (f"Invalid credentials backend name: { name } " )
@@ -84,26 +127,16 @@ def _getattr_wrapper(t, attr):
84127 try :
85128 return t .__getattribute__ (attr )
86129 except AttributeError as exc :
87- exc .args = (
88- (
89- f'Unable to locate attribute "{ attr } " '
90- f'in section "{ type (t ).__name__ } " '
91- f'at config file "{ self ._cfg_file } "'
92- ),
93- )
130+ exc .args = (self ._backend .attribute_error_msg (type (t ).__name__ , attr ),)
94131 raise exc
95132
96133 env_vars = [k for k in environ .keys () if k .startswith (f"{ section .upper ()} _" )]
97134 env_keys = [k .split (section .upper ())[1 ].lstrip ("_" ).lower () for k in env_vars ]
98135 env_values = [environ [e ] for e in env_vars ]
99136 if env_vars :
100137 logger .debug (f'Loading credentials from ENV vars: { ", " .join (env_vars )} ' )
101- params = []
102- if self ._cfg .has_section (section ):
103- params = self ._cfg .options (section )
104- values = [self ._cfg .get (section , x ) for x in params ]
105138
106- d = OrderedDict ( zip ( params , values ) )
139+ d = self . _backend . get_section ( section )
107140
108141 if env_vars :
109142 d .update (zip (env_keys , env_values ))
0 commit comments