Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ cython_debug/
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
.vscode/

# Ruff stuff:
.ruff_cache/
Expand Down
22 changes: 22 additions & 0 deletions src/pardner/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from pardner.verticals import Vertical


Expand Down Expand Up @@ -26,3 +28,23 @@ def __init__(self, *unsupported_verticals: Vertical, service_name: str) -> None:
class UnsupportedRequestException(Exception):
def __init__(self, service_name: str, message: str):
super().__init__(f'Cannot fetch data from {service_name}: {message}')


class TumblrAPIError(Exception):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems too specific if we were going to invest in many more connectors for this library. How would we handle TumblrAPIError differently from StravaAPIError? It makes it harder to write code that uses this library - new exceptions get defined when new connectors get added, and then code using this library has to get written to handle new exceptions.

Choosing how to categorize exceptions and how to group them is an interface design question which depends on how a library is going to be used, of course - might be fun to discuss tomorrow or sometime.

"""
Raised when the Tumblr API returns a non-OK HTTP status or a success response
whose payload is missing expected structure
"""

def __init__(
self,
message: str,
status_code: int | None = None,
raw_response: Any = None,
) -> None:
detail = f'Tumblr API error: {message}'
if status_code is not None:
detail += f' (HTTP {status_code})'
super().__init__(detail)
self.status_code = status_code
self.raw_response = raw_response
239 changes: 215 additions & 24 deletions src/pardner/services/tumblr.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import json
from typing import Any, Iterable, Optional, override
from datetime import datetime, timezone
from typing import Any, Iterable, Literal, Optional, override

from pardner.exceptions import UnsupportedRequestException
from requests import HTTPError

from pardner.exceptions import TumblrAPIError, UnsupportedRequestException
from pardner.services import BaseTransferService
from pardner.verticals import SocialPostingVertical, Vertical
from pardner.verticals.sub_verticals import AssociatedMediaSubVertical


class TumblrTransferService(BaseTransferService):
Expand Down Expand Up @@ -63,6 +67,166 @@ def fetch_token(
include_client_id: bool = True,
) -> dict[str, Any]:
return super().fetch_token(code, authorization_response, include_client_id)

def _validate_user_info_response(self, data: Any) -> dict[str, Any]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly what JSON Schema is for ! probably not worth it for this project, but validating a bunch of JSON in python code is a pain isn't it

"""
Validates the shape of a ``user/info`` JSON payload.

:param data: the parsed JSON dict returned by ``user/info``.
:returns: the ``response`` sub-dict if validation passes.
:raises: :class:`TumblrAPIError` if required keys are absent or have the wrong type.
"""
if not isinstance(data, dict):
raise TumblrAPIError(
'user/info response is not a JSON object', raw_response=data
)
response = data.get('response')
if not isinstance(response, dict):
raise TumblrAPIError(
"user/info response is missing a 'response' object", raw_response=data
)
user = response.get('user')
if not isinstance(user, dict):
raise TumblrAPIError(
'user/info response.user is missing or not an object', raw_response=data
)
blogs = user.get('blogs')
if not isinstance(blogs, list):
raise TumblrAPIError(
'user/info response.user.blogs is missing or not a list',
raw_response=data,
)
return response

def _validate_dashboard_response(self, data: Any) -> list[Any]:
"""
Validates the shape of a ``user/dashboard`` JSON payload.

:param data: the parsed JSON dict returned by ``user/dashboard``.
:returns: the ``posts`` list if validation passes.
:raises: :class:`TumblrAPIError` if required keys are absent or have the wrong type.
"""
if not isinstance(data, dict):
raise TumblrAPIError(
'user/dashboard response is not a JSON object', raw_response=data
)
response = data.get('response')
if not isinstance(response, dict):
raise TumblrAPIError(
"user/dashboard response is missing a 'response' object",
raw_response=data,
)
posts = response.get('posts')
if not isinstance(posts, list):
raise TumblrAPIError(
'user/dashboard response.posts is missing or not a list',
raw_response=data,
)
return posts

def _map_tumblr_state(
self, state: str | None
) -> Literal['public', 'private', 'draft', 'restricted'] | None:
"""Maps a Tumblr post ``state`` string to the vertical status literal."""
mapping: dict[str, Literal['public', 'private', 'draft', 'restricted']] = {
'published': 'public',
'private': 'private',
'draft': 'draft',
'queued': 'restricted',
'queue': 'restricted',
}
return mapping.get(state or '', None)

def parse_social_posting_vertical(
self, raw_data: Any
) -> SocialPostingVertical | None:
"""
Given a single raw Tumblr post dict, creates a
:class:`SocialPostingVertical` model object, if possible.

Maps stable NPF fields: ``id``, ``post_url``, ``timestamp``,
``summary``, ``note_count``, ``tags``, ``state``, NPF content
text blocks, and media blocks.

:param raw_data: a single post dict from the Tumblr dashboard response.
:returns: :class:`SocialPostingVertical` or ``None`` if ``raw_data``
is not a dict.
"""
if not isinstance(raw_data, dict):
return None

# identity / location
service_object_id: str | None = str(raw_data['id']) if 'id' in raw_data else None
post_url: str | None = raw_data.get('post_url') or raw_data.get('short_url')

created_at: datetime | None = None
timestamp = raw_data.get('timestamp')
if isinstance(timestamp, (int, float)):
created_at = datetime.fromtimestamp(timestamp, tz=timezone.utc).replace(
tzinfo=None
)

blog = raw_data.get('blog') or {}
creator_user_id: str | None = (
blog.get('uuid') or blog.get('name') or raw_data.get('blog_name')
)
data_owner_id: str = self.primary_blog_id or ''

interaction_count: int | None = raw_data.get('note_count')
keywords: list[str] = raw_data.get('tags') or []

status = self._map_tumblr_state(raw_data.get('state'))

# NPF content blocks
content_blocks: list[dict[str, Any]] = raw_data.get('content') or []
text_parts: list[str] = []
associated_media: list[AssociatedMediaSubVertical] = []

for block in content_blocks:
if not isinstance(block, dict):
continue
block_type = block.get('type', '')
if block_type == 'text':
text_value = block.get('text')
if isinstance(text_value, str) and text_value:
text_parts.append(text_value)
elif block_type in ('image', 'video', 'audio'):
media_type_map: dict[str, Literal['audio', 'image', 'video']] = {
'image': 'image',
'video': 'video',
'audio': 'audio',
}
media_type = media_type_map.get(block_type)
media_entries: list[dict[str, Any]] = block.get('media') or []
if isinstance(media_entries, list):
for entry in media_entries:
if isinstance(entry, dict) and entry.get('url'):
associated_media.append(
AssociatedMediaSubVertical(
media_type=media_type, url=entry['url']
)
)
elif isinstance(media_entries, dict) and media_entries.get('url'):
associated_media.append(
AssociatedMediaSubVertical(
media_type=media_type, url=media_entries['url']
)
)

return SocialPostingVertical(
creator_user_id=creator_user_id,
data_owner_id=data_owner_id,
service_object_id=service_object_id,
service=self._service_name,
created_at=created_at,
url=post_url,
abstract=raw_data.get('summary'),
interaction_count=interaction_count,
keywords=keywords,
status=status,
text='\n\n'.join(text_parts) if text_parts else None,
associated_media=associated_media,
)

def fetch_primary_blog_id(self) -> str:
"""
Expand All @@ -75,13 +239,27 @@ def fetch_primary_blog_id(self) -> str:

:returns: the primary blog id.

:raises: :class:`ValueError`: if the primary blog ID could not be extracted from
the response.
:raises: :class:`TumblrAPIError`: if the Tumblr API returns a non-OK response
or a malformed success payload.
:raises: :class:`ValueError`: if the response is structurally valid but no
primary blog with a UUID was found.
"""
if self.primary_blog_id:
return self.primary_blog_id
user_info = self._get_resource_from_path('user/info').json().get('response', {})
for blog_info in user_info.get('user', {}).get('blogs', []):

try:
raw_response = self._get_resource_from_path('user/info')
except HTTPError as exc:
raise TumblrAPIError(
'Failed to fetch user/info',
status_code=exc.response.status_code if exc.response is not None else None,
raw_response=exc.response,
) from exc

user_info_data = raw_response.json()
response = self._validate_user_info_response(user_info_data)

for blog_info in response['user']['blogs']:
if (
isinstance(blog_info, dict)
and blog_info.get('primary')
Expand All @@ -95,17 +273,17 @@ def fetch_primary_blog_id(self) -> str:
'Failed to fetch primary blog id. Either manually set the _primary_blog_id '
'attribute or verify all the client credentials '
'and permissions are correct. Response from Tumblr: '
f'{json.dumps(user_info, indent=2)}'
f'{json.dumps(user_info_data, indent=2)}'
)

def fetch_social_posting_vertical(
self,
request_params: dict[str, Any] = {},
count: int = 20,
text_only: bool = True,
) -> list[Any]:
) -> tuple[list[SocialPostingVertical | None], list[Any]]:
"""
Fetches posts from Tumblr feed for user account whose token was
Fetches posts from Tumblr feed for the user account whose token was
obtained using the Tumblr API.

:param count: number of posts to request.
Expand All @@ -115,21 +293,34 @@ def fetch_social_posting_vertical(
to the endpoint. Depending on the parameters passed, this could override
``count`` and ``text_only``.

:returns: a list of dictionary objects with information for the posts in a feed.
:returns: a two-element tuple: the first element is a list of
:class:`SocialPostingVertical` objects (``None`` for posts that could
not be parsed); the second element is the raw list of post dicts as
returned by the API.

:raises: :class:`UnsupportedRequestException` if the request is unable to be
made.
:raises: :class:`UnsupportedRequestException` if ``count`` exceeds 20.
:raises: :class:`TumblrAPIError` if the API returns a non-OK response
or a malformed success payload.
"""
if count <= 20:
params: dict[str, Any] = {'limit': count, 'npf': True, **request_params}
if text_only:
params['type'] = 'text'
dashboard_response = self._get_resource_from_path(
'user/dashboard',
params,
if count > 20:
raise UnsupportedRequestException(
self._service_name,
'can only make a request for at most 20 posts at a time.',
)
return list(dashboard_response.json().get('response').get('posts'))
raise UnsupportedRequestException(
self._service_name,
'can only make a request for at most 20 posts at a time.',
)

params: dict[str, Any] = {'limit': count, 'npf': True, **request_params}
if text_only:
params['type'] = 'text'

try:
dashboard_response = self._get_resource_from_path('user/dashboard', params)
except HTTPError as exc:
raise TumblrAPIError(
'Failed to fetch user/dashboard',
status_code=exc.response.status_code if exc.response is not None else None,
raw_response=exc.response,
) from exc

raw_posts = self._validate_dashboard_response(dashboard_response.json())
parsed = [self.parse_social_posting_vertical(post) for post in raw_posts]
return parsed, raw_posts
Loading
Loading