diff --git a/pyproject.toml b/pyproject.toml index 4218963..55114db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ classifiers = [ requires-python = ">=3.10" dependencies = [ "hkdf >=0.0.3", + "argon2-cffi >= 25.1.0", "pycryptodome >=3.17.0", "pydantic >=2.5.0", "httpx >=0.24.1", diff --git a/src/vaultwarden/clients/bitwarden.py b/src/vaultwarden/clients/bitwarden.py index 82eae74..d6b8c99 100644 --- a/src/vaultwarden/clients/bitwarden.py +++ b/src/vaultwarden/clients/bitwarden.py @@ -68,10 +68,14 @@ def _refresh_connect_token(self): ) self._connect_token = ConnectToken.model_validate_json(resp.text) + import vaultwarden.models.bitwarden + self._connect_token.master_key = make_master_key( password=self.password, salt=self.email, - iterations=self._connect_token.KdfIterations, + kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token( + self._connect_token + ), ) def _set_connect_token(self): @@ -92,11 +96,16 @@ def _set_connect_token(self): "identity/connect/token", headers=headers, data=payload ) self._connect_token = ConnectToken.model_validate_json(resp.text) + import vaultwarden.models.bitwarden + self._connect_token.master_key = make_master_key( password=self.password, salt=self.email, - iterations=self._connect_token.KdfIterations, + kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token( + self._connect_token + ), ) + return # login to api def _api_login(self) -> None: diff --git a/src/vaultwarden/models/bitwarden.py b/src/vaultwarden/models/bitwarden.py index 8a092c6..17de6c0 100644 --- a/src/vaultwarden/models/bitwarden.py +++ b/src/vaultwarden/models/bitwarden.py @@ -1,15 +1,42 @@ -from typing import Generic, Literal, TypeVar, cast +import dataclasses +import datetime +from typing import ( + TYPE_CHECKING, + Annotated, + Any, + Generic, + Literal, + Self, + TypeVar, + Union, + cast, +) from uuid import UUID -from pydantic import AliasChoices, Field, TypeAdapter, field_validator -from pydantic_core.core_schema import FieldValidationInfo +from pydantic import ( + AliasChoices, + Field, + ModelWrapValidatorHandler, + TypeAdapter, + WrapValidator, + field_validator, + model_validator, +) +from pydantic_core.core_schema import ( + FieldValidationInfo, + ValidationInfo, + ValidatorFunctionWrapHandler, +) from vaultwarden.clients.bitwarden import BitwardenAPIClient -from vaultwarden.models.enum import CipherType, OrganizationUserType +from vaultwarden.models.enum import CipherType, KdfType, OrganizationUserType from vaultwarden.models.exception_models import BitwardenError from vaultwarden.models.permissive_model import PermissiveBaseModel from vaultwarden.utils.crypto import decrypt, encrypt +if TYPE_CHECKING: + import vaultwarden.clients.bitwarden + # Pydantic models for Bitwarden data structures T = TypeVar("T", bound="BitwardenBaseModel") @@ -37,12 +64,180 @@ def api_client(self) -> BitwardenAPIClient: return self.bitwarden_client -class CipherDetails(BitwardenBaseModel): +def decode_bytes( + value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> bytes: + context: dict = cast(dict, info.context) + keys: list[bytes] = cast(list[bytes], context.get("cctx")) + for key in keys[::-1]: + try: + return decrypt(handler(value), key) + except Exception: + continue + raise ValueError("No key found") + + +def decode_string( + value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> str: + return decode_bytes(value, handler, info=info).decode("utf-8") + + +class UriMatch(BitwardenBaseModel): + class Config: + extra = "forbid" + + match: int | None = None + uri: Annotated[str, WrapValidator(decode_string)] | None = None + uriChecksum: Annotated[str, WrapValidator(decode_string)] | None = None + response: str | None = None + + +class XField(BitwardenBaseModel): + class Config: + extra = "forbid" + + name: Annotated[str, WrapValidator(decode_string)] | None = None + response: Annotated[str, WrapValidator(decode_string)] | None = None + type: int + value: Annotated[str, WrapValidator(decode_string)] | None = None + linkedId: str | None = None + + +class CipherLogin(BitwardenBaseModel): + class Config: + extra = "forbid" + + name: Annotated[str, WrapValidator(decode_string)] | None = None + autofillOnPageLoad: bool | None = None + password: Annotated[str, WrapValidator(decode_string)] | None = None + passwordRevisionDate: datetime.datetime | None = None + totp: str | None = None + uri: Annotated[str, WrapValidator(decode_string)] | None = None + uris: list[UriMatch] | None = None + username: Annotated[str, WrapValidator(decode_string)] | None = None + notes: Annotated[str, WrapValidator(decode_string)] | None = None + + +class PasswordChange(BitwardenBaseModel): + class Config: + extra = "forbid" + + lastUsedDate: datetime.datetime + password: str + + +class Fido2Credential(BitwardenBaseModel): + class Config: + extra = "forbid" + + counter: Annotated[str, WrapValidator(decode_string)] | None = None + creationDate: datetime.datetime | None = None + credentialId: Annotated[str, WrapValidator(decode_string)] | None = None + discoverable: Annotated[str, WrapValidator(decode_string)] | None = None + keyAlgorithm: Annotated[str, WrapValidator(decode_string)] | None = None + keyCurve: Annotated[str, WrapValidator(decode_string)] | None = None + keyType: Annotated[str, WrapValidator(decode_string)] | None = None + keyValue: Annotated[str, WrapValidator(decode_string)] | None = None + response: str | None = None + rpId: Annotated[str, WrapValidator(decode_string)] | None = None + rpName: Annotated[str, WrapValidator(decode_string)] | None = None + userDisplayName: Annotated[str, WrapValidator(decode_string)] | None = None + userHandle: Annotated[str, WrapValidator(decode_string)] | None = None + userName: Annotated[str, WrapValidator(decode_string)] | None = None + + +class LoginData(CipherLogin): + class Config: + extra = "forbid" + + fields: list[XField] | None = None + passwordHistory: list[PasswordChange] | None = None + response: str | None = None + fido2Credentials: list[Fido2Credential] | None = None + + +class SecureNoteData(CipherLogin): + class Config: + extra = "forbid" + + fields: list[XField] + passwordHistory: list[PasswordChange] + response: str | None = None + type: int | None = None + + +class SecureNoteProperty(BitwardenBaseModel): + class Config: + extra = "forbid" + + name: Annotated[str, WrapValidator(decode_string)] | None = None + notes: Annotated[str, WrapValidator(decode_string)] | None = None + fields: list[XField] | None = None + passwordHistory: list[PasswordChange] | None = None + response: Annotated[str, WrapValidator(decode_string)] | None = None + type: int + + +class Attachment(BitwardenBaseModel): + class Config: + extra = "forbid" + + fileName: Annotated[str, WrapValidator(decode_string)] | None = None + id: str + key: str | None = ( + None # Annotated[str, WrapValidator(decodeBytes)]|None = None + ) + object: str + size: int + sizeName: str + url: str + + +class _CipherBase(BitwardenBaseModel): + class Config: + extra = "forbid" + Id: UUID | None = None OrganizationId: UUID | None = Field(None, validate_default=True) Type: CipherType - Name: str + Name: Annotated[str, WrapValidator(decode_string)] CollectionIds: list[UUID] + key: str | None = None + + organizationUseTotp: bool | None = None + creationDate: datetime.datetime | None = None + deletedDate: datetime.datetime | None = None + fields: list[XField] | None = None + + notes: Annotated[str, WrapValidator(decode_string)] | None = None + reprompt: int + revisionDate: str + sshKey: str | None + passwordHistory: list[PasswordChange] + object: str | None = None + attachments: list[Attachment] | None = None + + @model_validator(mode="wrap") + @classmethod + def set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + if (key := data.get("key")) is not None: + context = cast(dict, info.context) + cctx = cast(list[bytes], context.get("cctx")) + + cctx.append(decrypt(key, cctx[0])) + + v = handler(data) + + if key is not None: + cctx.pop() + + return v @field_validator("OrganizationId") @classmethod @@ -88,6 +283,55 @@ def update_collection(self, collections: list[UUID]): ) +class Login(_CipherBase): + Type: Literal[CipherType.Login] + + login: LoginData | None = None + secureNote: None = None + card: None = None + identity: None = None + + data: LoginData | None = None + + +class SecureNote(_CipherBase): + Type: Literal[CipherType.SecureNote] + + login: None = None + secureNote: SecureNoteProperty | None = None + card: None = None + identity: None = None + + data: SecureNoteData | None = None + + +class Card(_CipherBase): + Type: Literal[CipherType.Card] + + login: None = None + card: None = None + secureNote: None = None + identity: None = None + + data: None = None + + +class Identity(_CipherBase): + Type: Literal[CipherType.Identity] + + login: None = None + secureNote: None = None + card: None = None + identity: None = None + + data: None = None + + +CipherDetails = Annotated[ + Union[Login, SecureNote, Card, Identity], Field(discriminator="Type") +] + + class CollectionAccess(BitwardenBaseModel): ReadOnly: bool = False HidePasswords: bool = False @@ -550,14 +794,15 @@ def _get_ciphers(self) -> list[CipherDetails]: "api/ciphers/organization-details", params={"organizationId": self.Id}, ) + org_key = self.key() res = ResplistBitwarden[CipherDetails].model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context={ + "parent_id": self.Id, + "client": self.api_client, + "cctx": [org_key], # crypto context + }, ) - org_key = self.key() - # map each cipher name to the decrypted name - for cipher in res.Data: - cipher.Name = decrypt(cipher.Name, org_key).decode("utf-8") return res.Data def ciphers( @@ -581,14 +826,12 @@ def ciphers( def key(self): sync = self.api_client.sync() - raw_key = None for org in sync.Profile.Organizations: if org.Id == self.Id: - raw_key = org.Key break - if raw_key is not None: - return decrypt(raw_key, self.api_client.connect_token.orgs_key) - raise BitwardenError(f"No Organizations `{self.Id}` found") + else: + raise BitwardenError(f"No Organizations `{self.Id}` found") + return decrypt(org.Key, self.api_client.connect_token.orgs_key) def get_organization( @@ -601,3 +844,22 @@ def get_organization( resp.text, context={"client": bitwarden_client, "parent_id": organisation_id}, ) + + +@dataclasses.dataclass +class Kdf: + Kdf: KdfType + KdfIterations: int | None = None + KdfMemory: int | None = None + KdfParallelism: int | None = None + + @classmethod + def from_connect_token( + cls, token: "vaultwarden.clients.bitwarden.ConnectToken" + ): + return cls( + token.Kdf, + token.KdfIterations, + token.KdfMemory, + token.KdfParallelism, + ) diff --git a/src/vaultwarden/models/enum.py b/src/vaultwarden/models/enum.py index 2b0b57c..ece782e 100644 --- a/src/vaultwarden/models/enum.py +++ b/src/vaultwarden/models/enum.py @@ -27,3 +27,8 @@ class VaultwardenUserStatus(IntEnum): Enabled = 0 Invited = 1 Disabled = 2 + + +class KdfType(IntEnum): + Pbkdf2 = 0 + Argon2id = 1 diff --git a/src/vaultwarden/models/sync.py b/src/vaultwarden/models/sync.py index 2e50448..3044ce2 100644 --- a/src/vaultwarden/models/sync.py +++ b/src/vaultwarden/models/sync.py @@ -3,13 +3,13 @@ from pydantic import AliasChoices, Field, field_validator -from vaultwarden.models.enum import VaultwardenUserStatus +from vaultwarden.models.enum import KdfType, VaultwardenUserStatus from vaultwarden.models.permissive_model import PermissiveBaseModel from vaultwarden.utils.crypto import decrypt class ConnectToken(PermissiveBaseModel): - Kdf: int = 0 + Kdf: KdfType = KdfType.Pbkdf2 KdfIterations: int = 0 KdfMemory: int | None = None KdfParallelism: int | None = None @@ -22,7 +22,8 @@ class ConnectToken(PermissiveBaseModel): scope: str unofficialServer: bool = False ResetMasterPassword: bool | None = None - master_key: bytes | None = None + + master_key: bytes | None = None # pydantic.PrivateAttr(default=None) @field_validator("expires_in") @classmethod diff --git a/src/vaultwarden/utils/crypto.py b/src/vaultwarden/utils/crypto.py index fa77b8e..9353546 100644 --- a/src/vaultwarden/utils/crypto.py +++ b/src/vaultwarden/utils/crypto.py @@ -14,11 +14,14 @@ from hashlib import pbkdf2_hmac, sha256 from hmac import new as hmac_new from secrets import token_bytes +import typing from Crypto.Cipher import AES, PKCS1_OAEP from Crypto.PublicKey import RSA from hkdf import hkdf_expand +if typing.TYPE_CHECKING: + import vaultwarden.models.bitwarden class CIPHERS(IntEnum): sym = 2 @@ -69,6 +72,7 @@ def decode_cipher_string(cipher_string): """decode a cipher tring into it's parts""" iv = None mac = None + assert cipher_string is not None if not ENCRYPTED_STRING_RE.match(cipher_string): raise WrongFormatError(f"{cipher_string}") try: @@ -114,14 +118,37 @@ def is_encrypted(cipher_string): return True -def make_master_key(password, salt, iterations=ITERATIONS): - salt = salt.lower() - if not hasattr(password, "decode"): - password = password.encode("utf-8") - if not hasattr(salt, "decode"): - salt = salt.encode("utf-8") - return pbkdf2_hmac("sha256", password, salt, iterations) - +def make_master_key(password_: str, salt_: str, kdf: "vaultwarden.models.bitwarden.Kdf"): + import vaultwarden.models.bitwarden + + assert isinstance(salt_, str) + assert isinstance(password_, str) + + password = password_.encode("utf-8") + salt = salt_.lower().encode("utf-8") + + match kdf.Kdf: + case vaultwarden.models.bitwarden.KdfType.Pbkdf2: + assert kdf.KdfIterations is not None + return pbkdf2_hmac("sha256", password, salt, kdf.KdfIterations) + case vaultwarden.models.bitwarden.KdfType.Argon2id: + # c.f. + # https://github.com/vaultwarden/vw_web_builds/blob/355bddc6c9d5c110e55fe74c5fcfa86ddd85572c/libs/common/src/platform/services/key-generation.service.ts#L55-L75 + import argon2 + assert kdf.KdfIterations is not None + assert kdf.KdfMemory is not None + assert kdf.KdfParallelism is not None + hsalt = hashlib.new("sha256", salt).digest() + v = argon2.low_level.hash_secret_raw( + password, + hsalt, + time_cost=kdf.KdfIterations, + memory_cost=kdf.KdfMemory * 1024, + parallelism=kdf.KdfParallelism, + hash_len=32, + type=argon2.Type.ID, + ) + return v def hash_password(password, salt, iterations=ITERATIONS): """base64-encode a wrapped, stretched password+salt(email) for signup/login""" @@ -159,9 +186,7 @@ def aes_encrypt(plaintext, key, charset="utf-8"): def encrypt_sym(plaintext, key, to_bytes=False, *a, **kw): # inspired from bitwarden/jslib:src/services/crypto.service.ts - typ, (iv, ct, mac) = int(CIPHERS.sym), aes_encrypt( - plaintext, key, *a, **kw - ) + typ, (iv, ct, mac) = int(CIPHERS.sym), aes_encrypt(plaintext, key, *a, **kw) if mac: mac = mac.digest() if to_bytes: @@ -242,9 +267,7 @@ def decrypt_bytes(cipher_bytes, key, *a, **kw): ct = cipher_bytes[49:] ret = decrypt_sym(ct, key, iv, mac) else: - raise UnimplementedError( - f"{typ} encType decryption is not implemented" - ) + raise UnimplementedError(f"{typ} encType decryption is not implemented") return ret diff --git a/tests/fixtures/server/db.sqlite3 b/tests/fixtures/server/db.sqlite3 index bf27515..93b7dcd 100644 --- a/tests/fixtures/server/db.sqlite3 +++ b/tests/fixtures/server/db.sqlite3 @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c8c11a331ba097f1644b297885cd09b4ac5fc975ed5605176c880bc5cf08a813 -size 245760 +oid sha256:45f2e69202615d295d7687fa4752e189ad29cae7de8852101f93f0b679cbcab6 +size 262144