Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ classifiers = [
requires-python = ">=3.10"
dependencies = [
"hkdf >=0.0.3",
"argon2-cffi >= 25.1.0",
"pycryptodome >=3.17.0",
"pydantic >=2.5.0",
"httpx >=0.24.1",
Expand Down
13 changes: 11 additions & 2 deletions src/vaultwarden/clients/bitwarden.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ def _refresh_connect_token(self):
)
self._connect_token = ConnectToken.model_validate_json(resp.text)

import vaultwarden.models.bitwarden

self._connect_token.master_key = make_master_key(
password=self.password,
salt=self.email,
iterations=self._connect_token.KdfIterations,
kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token(
self._connect_token
),
)

def _set_connect_token(self):
Expand All @@ -92,11 +96,16 @@ def _set_connect_token(self):
"identity/connect/token", headers=headers, data=payload
)
self._connect_token = ConnectToken.model_validate_json(resp.text)
import vaultwarden.models.bitwarden

self._connect_token.master_key = make_master_key(
password=self.password,
salt=self.email,
iterations=self._connect_token.KdfIterations,
kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token(
self._connect_token
),
)
return

# login to api
def _api_login(self) -> None:
Expand Down
294 changes: 278 additions & 16 deletions src/vaultwarden/models/bitwarden.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,42 @@
from typing import Generic, Literal, TypeVar, cast
import dataclasses
import datetime
from typing import (
TYPE_CHECKING,
Annotated,
Any,
Generic,
Literal,
Self,
TypeVar,
Union,
cast,
)
from uuid import UUID

from pydantic import AliasChoices, Field, TypeAdapter, field_validator
from pydantic_core.core_schema import FieldValidationInfo
from pydantic import (
AliasChoices,
Field,
ModelWrapValidatorHandler,
TypeAdapter,
WrapValidator,
field_validator,
model_validator,
)
from pydantic_core.core_schema import (
FieldValidationInfo,
ValidationInfo,
ValidatorFunctionWrapHandler,
)

from vaultwarden.clients.bitwarden import BitwardenAPIClient
from vaultwarden.models.enum import CipherType, OrganizationUserType
from vaultwarden.models.enum import CipherType, KdfType, OrganizationUserType
from vaultwarden.models.exception_models import BitwardenError
from vaultwarden.models.permissive_model import PermissiveBaseModel
from vaultwarden.utils.crypto import decrypt, encrypt

if TYPE_CHECKING:
import vaultwarden.clients.bitwarden

# Pydantic models for Bitwarden data structures

T = TypeVar("T", bound="BitwardenBaseModel")
Expand Down Expand Up @@ -37,12 +64,180 @@ def api_client(self) -> BitwardenAPIClient:
return self.bitwarden_client


class CipherDetails(BitwardenBaseModel):
def decode_bytes(
value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo
) -> bytes:
context: dict = cast(dict, info.context)
keys: list[bytes] = cast(list[bytes], context.get("cctx"))
for key in keys[::-1]:
try:
return decrypt(handler(value), key)
except Exception:
continue
raise ValueError("No key found")


def decode_string(
value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo
) -> str:
return decode_bytes(value, handler, info=info).decode("utf-8")


class UriMatch(BitwardenBaseModel):
class Config:
extra = "forbid"

match: int | None = None
uri: Annotated[str, WrapValidator(decode_string)] | None = None
uriChecksum: Annotated[str, WrapValidator(decode_string)] | None = None
response: str | None = None


class XField(BitwardenBaseModel):
class Config:
extra = "forbid"

name: Annotated[str, WrapValidator(decode_string)] | None = None
response: Annotated[str, WrapValidator(decode_string)] | None = None
type: int
value: Annotated[str, WrapValidator(decode_string)] | None = None
linkedId: str | None = None


class CipherLogin(BitwardenBaseModel):
class Config:
extra = "forbid"

name: Annotated[str, WrapValidator(decode_string)] | None = None
autofillOnPageLoad: bool | None = None
password: Annotated[str, WrapValidator(decode_string)] | None = None
passwordRevisionDate: datetime.datetime | None = None
totp: str | None = None
uri: Annotated[str, WrapValidator(decode_string)] | None = None
uris: list[UriMatch] | None = None
username: Annotated[str, WrapValidator(decode_string)] | None = None
notes: Annotated[str, WrapValidator(decode_string)] | None = None


class PasswordChange(BitwardenBaseModel):
class Config:
extra = "forbid"

lastUsedDate: datetime.datetime
password: str


class Fido2Credential(BitwardenBaseModel):
class Config:
extra = "forbid"

counter: Annotated[str, WrapValidator(decode_string)] | None = None
creationDate: datetime.datetime | None = None
credentialId: Annotated[str, WrapValidator(decode_string)] | None = None
discoverable: Annotated[str, WrapValidator(decode_string)] | None = None
keyAlgorithm: Annotated[str, WrapValidator(decode_string)] | None = None
keyCurve: Annotated[str, WrapValidator(decode_string)] | None = None
keyType: Annotated[str, WrapValidator(decode_string)] | None = None
keyValue: Annotated[str, WrapValidator(decode_string)] | None = None
response: str | None = None
rpId: Annotated[str, WrapValidator(decode_string)] | None = None
rpName: Annotated[str, WrapValidator(decode_string)] | None = None
userDisplayName: Annotated[str, WrapValidator(decode_string)] | None = None
userHandle: Annotated[str, WrapValidator(decode_string)] | None = None
userName: Annotated[str, WrapValidator(decode_string)] | None = None


class LoginData(CipherLogin):
class Config:
extra = "forbid"

fields: list[XField] | None = None
passwordHistory: list[PasswordChange] | None = None
response: str | None = None
fido2Credentials: list[Fido2Credential] | None = None


class SecureNoteData(CipherLogin):
class Config:
extra = "forbid"

fields: list[XField]
passwordHistory: list[PasswordChange]
response: str | None = None
type: int | None = None


class SecureNoteProperty(BitwardenBaseModel):
class Config:
extra = "forbid"

name: Annotated[str, WrapValidator(decode_string)] | None = None
notes: Annotated[str, WrapValidator(decode_string)] | None = None
fields: list[XField] | None = None
passwordHistory: list[PasswordChange] | None = None
response: Annotated[str, WrapValidator(decode_string)] | None = None
type: int


class Attachment(BitwardenBaseModel):
class Config:
extra = "forbid"

fileName: Annotated[str, WrapValidator(decode_string)] | None = None
id: str
key: str | None = (
None # Annotated[str, WrapValidator(decodeBytes)]|None = None
)
object: str
size: int
sizeName: str
url: str


class _CipherBase(BitwardenBaseModel):
class Config:
extra = "forbid"

Id: UUID | None = None
OrganizationId: UUID | None = Field(None, validate_default=True)
Type: CipherType
Name: str
Name: Annotated[str, WrapValidator(decode_string)]
CollectionIds: list[UUID]
key: str | None = None

organizationUseTotp: bool | None = None
creationDate: datetime.datetime | None = None
deletedDate: datetime.datetime | None = None
fields: list[XField] | None = None

notes: Annotated[str, WrapValidator(decode_string)] | None = None
reprompt: int
revisionDate: str
sshKey: str | None
passwordHistory: list[PasswordChange]
object: str | None = None
attachments: list[Attachment] | None = None

@model_validator(mode="wrap")
@classmethod
def set_key(
cls,
data: Any,
handler: ModelWrapValidatorHandler[Self],
info: ValidationInfo,
) -> Self:
if (key := data.get("key")) is not None:
context = cast(dict, info.context)
cctx = cast(list[bytes], context.get("cctx"))

cctx.append(decrypt(key, cctx[0]))

v = handler(data)

if key is not None:
cctx.pop()

return v

@field_validator("OrganizationId")
@classmethod
Expand Down Expand Up @@ -88,6 +283,55 @@ def update_collection(self, collections: list[UUID]):
)


class Login(_CipherBase):
Type: Literal[CipherType.Login]

login: LoginData | None = None
secureNote: None = None
card: None = None
identity: None = None

data: LoginData | None = None


class SecureNote(_CipherBase):
Type: Literal[CipherType.SecureNote]

login: None = None
secureNote: SecureNoteProperty | None = None
card: None = None
identity: None = None

data: SecureNoteData | None = None


class Card(_CipherBase):
Type: Literal[CipherType.Card]

login: None = None
card: None = None
secureNote: None = None
identity: None = None

data: None = None


class Identity(_CipherBase):
Type: Literal[CipherType.Identity]

login: None = None
secureNote: None = None
card: None = None
identity: None = None

data: None = None


CipherDetails = Annotated[
Union[Login, SecureNote, Card, Identity], Field(discriminator="Type")
]


class CollectionAccess(BitwardenBaseModel):
ReadOnly: bool = False
HidePasswords: bool = False
Expand Down Expand Up @@ -550,14 +794,15 @@ def _get_ciphers(self) -> list[CipherDetails]:
"api/ciphers/organization-details",
params={"organizationId": self.Id},
)
org_key = self.key()
res = ResplistBitwarden[CipherDetails].model_validate_json(
resp.text,
context={"parent_id": self.Id, "client": self.api_client},
context={
"parent_id": self.Id,
"client": self.api_client,
"cctx": [org_key], # crypto context
},
)
org_key = self.key()
# map each cipher name to the decrypted name
for cipher in res.Data:
cipher.Name = decrypt(cipher.Name, org_key).decode("utf-8")
return res.Data

def ciphers(
Expand All @@ -581,14 +826,12 @@ def ciphers(

def key(self):
sync = self.api_client.sync()
raw_key = None
for org in sync.Profile.Organizations:
if org.Id == self.Id:
raw_key = org.Key
break
if raw_key is not None:
return decrypt(raw_key, self.api_client.connect_token.orgs_key)
raise BitwardenError(f"No Organizations `{self.Id}` found")
else:
raise BitwardenError(f"No Organizations `{self.Id}` found")
return decrypt(org.Key, self.api_client.connect_token.orgs_key)


def get_organization(
Expand All @@ -601,3 +844,22 @@ def get_organization(
resp.text,
context={"client": bitwarden_client, "parent_id": organisation_id},
)


@dataclasses.dataclass
class Kdf:
Kdf: KdfType
KdfIterations: int | None = None
KdfMemory: int | None = None
KdfParallelism: int | None = None

@classmethod
def from_connect_token(
cls, token: "vaultwarden.clients.bitwarden.ConnectToken"
):
return cls(
token.Kdf,
token.KdfIterations,
token.KdfMemory,
token.KdfParallelism,
)
5 changes: 5 additions & 0 deletions src/vaultwarden/models/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ class VaultwardenUserStatus(IntEnum):
Enabled = 0
Invited = 1
Disabled = 2


class KdfType(IntEnum):
Pbkdf2 = 0
Argon2id = 1
Loading