diff --git a/manual_testing/mastodon_manual_test.py b/manual_testing/mastodon_manual_test.py index e4b1c01..369eaa4 100644 --- a/manual_testing/mastodon_manual_test.py +++ b/manual_testing/mastodon_manual_test.py @@ -1,10 +1,95 @@ import sys, os +import time sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) -from abstractions import Post +from abstractions import AdoptablePet from social_posters.mastodon import PosterMastodon +def post_exceed_500_chars_limit_with_adoption_link(): + pet = AdoptablePet("Brian", + "Labrador Retriever", + "White Labrador", + "Quahog", + "I am a writer! Post exceeds limit with adoption link"*200, + "http://www.davidgorman.com/4quartets/", + "https://static.wikia.nocookie.net/familyguy/images/c/c2/FamilyGuy_Single_BrianWriter_R7.jpg/revision/latest?cb=20230807152447", + 11, + "Male", + None, + None + ) + return pet +def post_exceed_500_chars_limit_without_adoption_link(): + pet = AdoptablePet("Vinny", + "Unknown", + "Unknown", + "Quahog", + "I am 1/16th cat! Post exceeds word limit without adoption link."*1000, + None, + "https://static.wikia.nocookie.net/familyguyfanon/images/e/ec/Vinny_Griffin.png/revision/latest?cb=20161129110103", + None, + "Male", + None, + None + ) + return pet + + +def post_within_500_chars_limit_with_adoption_link(): + pet = AdoptablePet("Ernie", + "Chicken", + "Unknown", + "Quahog", + "cluck. Post within word limit with adoption link.", + "https://poets.org/poem/having-coke-you", + "https://static.wikia.nocookie.net/villains/images/2/2e/Giant_chicken_animation.png/revision/latest?cb=20220615120124", + None, + "Male", + None, + None + ) + return pet + +def post_within_500_chars_limit_without_adoption_link(): + pet = AdoptablePet("Pouncy", + "Cat", + "Unknown", + "Quahog", + "Meow. Post within 500 limit without adoption link", + None, + "https://static.wikia.nocookie.net/villains/images/7/76/Pouncey.webp/revision/latest?cb=20220403224856", + None, + "Male", + None, + None + ) + return pet + +def post_unicode(): + pet = AdoptablePet("Vinny", + "Unknown", + "Unknown", + "Quahog", + "🐶❤️ 可爱的小狗 Friendly \"lap cat\" @ shelter #AdoptMe", + None, + "https://static.wikia.nocookie.net/familyguyfanon/images/e/ec/Vinny_Griffin.png/revision/latest?cb=20161129110103", + None, + "Male", + None, + None + ) + return pet + +# !!!DO NOT TEST MULTIPLE CASES AT THE SAME TIME!!! +# !!!OTHERWISE YOU MAY TRIGGER SPAM DETECTION!!! +testingCases = [ + post_exceed_500_chars_limit_with_adoption_link, + post_exceed_500_chars_limit_without_adoption_link, + post_within_500_chars_limit_with_adoption_link, + post_within_500_chars_limit_without_adoption_link, + post_unicode +] def main(): poster = PosterMastodon() @@ -15,19 +100,22 @@ def main(): print("Authenticated to Mastodon!") - post = Post( - text="Test post", - image_url="https://static.wikia.nocookie.net/familyguy/images/c/c2/FamilyGuy_Single_BrianWriter_R7.jpg/revision/latest?cb=20230807152447", - alt_text="Cute animal", - tags=["Test", "Mastodon"], - ) + for pet in testingCases: + pet_instance = pet() + + post = poster.format_post(pet_instance) + target_url = pet_instance.adoption_url + if target_url and (target_url not in post.text): + print("Adoption link not posted!") + + result = poster.publish(post) - result = poster.publish(post) + if result.success: + print(f"Posted successfully! URL: {result.post_url}") + else: + print(f"Post failed: {result.error_message}") + time.sleep(1) - if result.success: - print(f"Posted successfully! URL: {result.post_url}") - else: - print(f"Post failed: {result.error_message}") if __name__ == "__main__": main() \ No newline at end of file diff --git a/manual_testing/mastodon_preview_test.py b/manual_testing/mastodon_preview_test.py new file mode 100644 index 0000000..c919ed0 --- /dev/null +++ b/manual_testing/mastodon_preview_test.py @@ -0,0 +1,207 @@ +""" +Mastodon formatting preview tool: + +This file visualizes different stages of the Mastodon formatting pipeline. + +Pipeline structure: +AdoptablePet + -> Post + -> PreparedCaption + -> CaptionThread + +Examples: + +Preview raw pet input: +python manual_tests/mastodon_preview.py --stage pet + +Preview generated platform-independent Post: +python manual_tests/mastodon_preview.py --stage post + +Preview fully formatted Mastodon thread: +python manual_tests/mastodon_preview.py --stage debug + +Preview only the main Mastodon post: +python manual_tests/mastodon_preview.py --stage main + +Preview only reply thread chunks: +python manual_tests/mastodon_preview.py --stage replies + +Preview every pipeline stage: +python manual_tests/mastodon_preview.py --stage all + +(default behavior is --stage all) + +To extend preview stages: +1. Add a new PreviewStage enum entry +2. Add a renderer/action for that stage +""" + +from __future__ import annotations + +import argparse +import os +import sys +from dataclasses import asdict, is_dataclass +from enum import StrEnum +from pprint import pprint + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from mastodon_manual_test import post_exceed_500_chars_limit_with_adoption_link +from social_posters.mastodon import CaptionThread, MastodonPhase, PosterMastodon +from utils.pipeline import Phase, PipelineResult +from utils.pipeline_preview import PreviewSection, print_section, render_sections + + +class PreviewStage(StrEnum): + PET = "pet" + POST = "post" + PREPARED_CAPTION = "prepared_caption" + CAPTION_THREAD = "caption_thread" + MAIN = "main" + REPLIES = "replies" + TRACE = "trace" + ALL = "all" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Preview each stage of the Mastodon formatting pipeline." + ) + parser.add_argument( + "--stage", + type=PreviewStage, + choices=list(PreviewStage), + default=PreviewStage.ALL, + help="Which construction stage to preview.", + ) + return parser.parse_args() + + +def find_phase(pipeline: PipelineResult, phase_name: MastodonPhase) -> Phase | None: + for phase in pipeline.trace: + if phase.name == phase_name: + return phase + + return None + + +def value_for_phase(pipeline: PipelineResult, phase_name: MastodonPhase) -> object | None: + phase = find_phase(pipeline, phase_name) + return phase.value if phase else None + + +def print_value(value: object) -> None: + if is_dataclass(value) and not isinstance(value, type): + pprint(asdict(value)) + else: + pprint(value) + + +def print_phase(pipeline: PipelineResult, phase_name: MastodonPhase) -> None: + value = value_for_phase(pipeline, phase_name) + + if value is None: + print(f"(No value recorded for {phase_name})") + return + + print_value(value) + + +def print_trace(pipeline: PipelineResult) -> None: + for i, phase in enumerate(pipeline.trace, start=1): + print_section(f"PHASE {i}: {phase.name}") + print_value(phase.value) + + if pipeline.errors: + print_section("ERRORS") + for error in pipeline.errors: + print(f"{type(error).__name__}: {error}") + + +def print_main_caption(thread: CaptionThread | None) -> None: + if thread is None: + print("(No caption thread)") + return + + print(thread.main_caption) + print(f"\nLength: {len(thread.main_caption)}") + + +def print_replies(thread: CaptionThread | None) -> None: + if thread is None: + print("(No caption thread)") + return + + if not thread.replies: + print("(No replies)") + return + + for i, reply in enumerate(thread.replies, start=1): + print_section(f"REPLY {i}") + print(reply) + print(f"\nLength: {len(reply)}") + + +def main() -> None: + args = parse_args() + selected_stage: PreviewStage = args.stage + + poster = PosterMastodon.__new__(PosterMastodon) + + pet = post_exceed_500_chars_limit_with_adoption_link() + pipeline = poster.build_formatting_pipeline(pet) + + thread = ( + pipeline.value + if pipeline.ok and isinstance(pipeline.value, CaptionThread) + else None + ) + + sections = [ + PreviewSection( + PreviewStage.PET, + "PET", + lambda: print_phase(pipeline, MastodonPhase.PET), + ), + PreviewSection( + PreviewStage.POST, + "POST OBJECT", + lambda: print_phase(pipeline, MastodonPhase.POST), + ), + PreviewSection( + PreviewStage.PREPARED_CAPTION, + "PREPARED CAPTION", + lambda: print_phase(pipeline, MastodonPhase.PREPARED_CAPTION), + ), + PreviewSection( + PreviewStage.CAPTION_THREAD, + "CAPTION THREAD", + lambda: print_phase(pipeline, MastodonPhase.CAPTION_THREAD), + ), + PreviewSection( + PreviewStage.MAIN, + "MAIN POST", + lambda: print_main_caption(thread), + ), + PreviewSection( + PreviewStage.REPLIES, + "REPLIES", + lambda: print_replies(thread), + ), + PreviewSection( + PreviewStage.TRACE, + "FULL TRACE", + lambda: print_trace(pipeline), + ), + ] + + render_sections( + sections=sections, + selected=selected_stage, + all_stage=PreviewStage.ALL, + ) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 1915a71..755e9d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,10 +3,12 @@ api-display-purposes==0.0.3 attrs==25.4.0 beautifulsoup4==4.14.3 blurhash==1.1.5 +build==1.5.0 certifi==2026.2.25 chardet==3.0.4 charset-normalizer==3.4.4 clarifai==2.6.2 +click==8.3.3 configparser==3.8.1 decorator==4.0.2 EasyProcess==1.1 @@ -17,36 +19,28 @@ grpcio==1.78.0 h11==0.16.0 httpcore==1.0.9 httpx==0.28.1 +hypothesis==6.152.7 idna==2.10 +iniconfig==2.3.0 instapy==0.6.16 jsonschema==2.6.0 Mastodon.py==2.1.4 MeaningCloud-python==2.0.0 outcome==1.3.0.post0 +packaging==26.2 +pip-tools==7.5.3 +pluggy==1.6.0 plyer==2.1.0 protobuf==3.20.3 +Pygments==2.20.0 +pyproject_hooks==1.2.0 PySocks==1.7.1 pytest==9.0.3 python-dateutil==2.9.0.post0 -python-magic==0.4.27 -python-telegram-bot==22.6 -PyVirtualDisplay==3.0 -PyYAML==6.0.3 -regex==2026.2.28 -requests==2.32.5 -selenium==4.41.0 -semantic-version==2.10.0 +requests==2.33.1 setuptools==82.0.0 -setuptools-rust==1.12.0 six==1.17.0 -sniffio==1.3.1 sortedcontainers==2.4.0 -soupsieve==2.8.3 -tqdm==4.67.3 -trio==0.33.0 -trio-websocket==0.12.2 typing_extensions==4.15.0 urllib3==2.6.3 -webdriverdownloader==1.1.0.4 -websocket-client==1.9.0 -wsproto==1.3.2 +wheel==0.47.0 diff --git a/social_posters/mastodon.py b/social_posters/mastodon.py index b50001b..e0ff67d 100644 --- a/social_posters/mastodon.py +++ b/social_posters/mastodon.py @@ -1,24 +1,59 @@ +from __future__ import annotations + import os -from urllib.parse import urlparse import tempfile +from dataclasses import dataclass +from enum import StrEnum +from urllib.parse import urlparse import requests from mastodon import Mastodon -from abstractions import Post, PostResult, SocialPoster +from abstractions import AdoptablePet, Post, PostResult, SocialPoster +from abstractions import CITY_NAME, CITY_STATE +from utils.pipeline import PipelineResult, add_phase, start_pipeline + +THREAD_SUFFIX = "\n\nMore details below ⬇️" MASTODON_CHARACTER_LIMIT = 500 TRUNCATION_SUFFIX = "..." +MAX_REPLIES = 5 + + +class MastodonPhase(StrEnum): + PET = "pet" + POST = "post" + PREPARED_CAPTION = "prepared_caption" + CAPTION_THREAD = "caption_thread" + + +@dataclass(frozen=True) +class PreparedCaption: + post: Post + caption_text: str + tags: list[str] + tag_suffix: str + + +@dataclass(frozen=True) +class CaptionThread: + main_caption: str + replies: list[str] + main_limit: int | None + main_text: str | None + overflow: str | None + was_split: bool + was_capped: bool class PosterMastodon(SocialPoster): - def __init__(self): + def __init__(self) -> None: raw_token = os.environ.get("MASTODON_TOKEN") self.token = raw_token.strip() if raw_token else None self.api_base_url = "https://mastodon.social" - self._session = None + self._session: Mastodon | None = None self._is_available = bool(self.token) - self._auth_error = None + self._auth_error: str | None = None @property def platform_name(self) -> str: @@ -39,6 +74,38 @@ def authenticate(self) -> bool: return False def publish(self, post: Post) -> PostResult: + error = self._ensure_ready_to_publish(post) + if error: + return error + + image_path = None + + try: + image_path = self._download_image(post.image_url) + + media = self._session.media_post( + image_path, + description=post.alt_text or "Photo of an adoptable pet", + ) + + main_caption, replies = self._format_caption_thread(post) + status = self._post_thread(main_caption, replies, media["id"]) + + return PostResult( + success=True, + post_id=str(status["id"]), + post_url=status.get("url"), + ) + + except Exception as exc: + return PostResult(success=False, error_message=str(exc)) + + finally: + self._session = None + if image_path and os.path.exists(image_path): + os.unlink(image_path) + + def _ensure_ready_to_publish(self, post: Post) -> PostResult | None: if not self._is_available: return PostResult( success=False, @@ -61,51 +128,217 @@ def publish(self, post: Post) -> PostResult: ), ) - image_path = None - try: - image_path = self._download_image(post.image_url) - media = self._session.media_post( - image_path, - description=post.alt_text or "Photo of an adoptable pet", - ) - status = self._session.status_post( - self._format_caption(post), - media_ids=[media["id"]], + return None + + def _post_thread( + self, + main_caption: str, + replies: list[str], + media_id: str, + ) -> dict: + status = self._session.status_post( + main_caption, + media_ids=[media_id], + ) + + root_status_id = status["id"] + + for reply_text in replies: + self._session.status_post( + reply_text, + in_reply_to_id=root_status_id, ) - return PostResult( - success=True, - post_id=str(status["id"]), - post_url=status.get("url"), + + return status + + """ + Implement your own pipeline and consume it in a preview file + for debugging purposes + Here, we build a pipeline from AdoptablePet (upstream input) + -> Post -> Prepared Caption -> Caption Thread + """ + def build_formatting_pipeline( + self, + pet: AdoptablePet, + ) -> PipelineResult[CaptionThread]: + pipeline = start_pipeline(MastodonPhase.PET, pet) + + pipeline = add_phase( + pipeline, + MastodonPhase.POST, + self.format_post, + ) + + pipeline = add_phase( + pipeline, + MastodonPhase.PREPARED_CAPTION, + self._prepare_caption, + ) + + pipeline = add_phase( + pipeline, + MastodonPhase.CAPTION_THREAD, + self._build_caption_thread, + ) + + return pipeline + + def _format_caption_thread(self, post: Post) -> tuple[str, list[str]]: + prepared = self._prepare_caption(post) + thread = self._build_caption_thread(prepared) + return thread.main_caption, thread.replies + + def _prepare_caption(self, post: Post) -> PreparedCaption: + tags, tag_suffix = self._format_tags(post.tags) + + return PreparedCaption( + post=post, + caption_text=post.text.strip(), + tags=tags, + tag_suffix=tag_suffix, + ) + + def _build_caption_thread(self, prepared: PreparedCaption) -> CaptionThread: + caption_text = prepared.caption_text + tag_suffix = prepared.tag_suffix + + if self._fits_single_post(caption_text, tag_suffix): + return CaptionThread( + main_caption=f"{caption_text}{tag_suffix}", + replies=[], + main_limit=None, + main_text=caption_text, + overflow="", + was_split=False, + was_capped=False, ) - except Exception as exc: - return PostResult(success=False, error_message=str(exc)) - finally: - self._session = None - if image_path and os.path.exists(image_path): - os.unlink(image_path) - def _format_caption(self, post: Post) -> str: - tags = " ".join(f"#{tag}" for tag in post.tags if tag) - tag_suffix = f"\n\n{tags}" if tags else "" - available_text_length = MASTODON_CHARACTER_LIMIT - len(tag_suffix) + main_limit = self._validated_main_limit(tag_suffix) + main_text, overflow = self._safe_truncate(caption_text, main_limit) + replies = self._split_reply_chunks(overflow) + + main_caption = self._build_main_caption(main_text, tag_suffix) - if available_text_length <= len(TRUNCATION_SUFFIX): - return (tag_suffix[-MASTODON_CHARACTER_LIMIT:]).strip() + return CaptionThread( + main_caption=main_caption, + replies=replies, + main_limit=main_limit, + main_text=main_text, + overflow=overflow, + was_split=True, + was_capped=replies[-1].endswith(TRUNCATION_SUFFIX) if replies else False, + ) - caption_text = post.text.strip() - if len(caption_text) > available_text_length: - caption_text = caption_text[: available_text_length - len(TRUNCATION_SUFFIX)].rstrip() - caption_text = f"{caption_text}{TRUNCATION_SUFFIX}" + @staticmethod + def _fits_single_post(caption_text: str, tag_suffix: str) -> bool: + return len(caption_text) + len(tag_suffix) <= MASTODON_CHARACTER_LIMIT - return f"{caption_text}{tag_suffix}" + def _validated_main_limit(self, tag_suffix: str) -> int: + main_limit = self._main_caption_limit(tag_suffix) + + if main_limit <= 0: + raise ValueError("Tags are too long to fit in a Mastodon post.") + + return main_limit + + @staticmethod + def _build_main_caption(main_text: str, tag_suffix: str) -> str: + return ( + f"{main_text}" + f"{TRUNCATION_SUFFIX}" + f"{THREAD_SUFFIX}" + f"{tag_suffix}" + ) + + @staticmethod + def _format_tags(tags: list[str]) -> tuple[list[str], str]: + clean_tags = [tag for tag in tags if tag] + tag_text = " ".join(f"#{tag}" for tag in clean_tags) + tag_suffix = f"\n\n{tag_text}" if tag_text else "" + return clean_tags, tag_suffix + + @staticmethod + def _main_caption_limit(tag_suffix: str) -> int: + return ( + MASTODON_CHARACTER_LIMIT + - len(tag_suffix) + - len(THREAD_SUFFIX) + - len(TRUNCATION_SUFFIX) + ) + + def _split_reply_chunks(self, text: str) -> list[str]: + chunks = [] + remaining = text.strip() + + while remaining and len(chunks) < MAX_REPLIES: + chunk, remaining = self._safe_truncate( + remaining, + MASTODON_CHARACTER_LIMIT, + ) + chunks.append(chunk) + + if remaining and chunks: + cutoff = MASTODON_CHARACTER_LIMIT - len(TRUNCATION_SUFFIX) + last_chunk, _ = self._safe_truncate(chunks[-1], cutoff) + chunks[-1] = f"{last_chunk}{TRUNCATION_SUFFIX}" + + return chunks def _download_image(self, image_url: str) -> str: parsed_url = urlparse(image_url) ext = os.path.splitext(parsed_url.path)[1] or ".jpg" + with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: - response = requests.get(image_url, stream=True, timeout=20) - response.raise_for_status() - for chunk in response.iter_content(chunk_size=1024 * 128): - if chunk: - tmp.write(chunk) + with requests.get(image_url, stream=True, timeout=20) as response: + response.raise_for_status() + + for chunk in response.iter_content(chunk_size=1024 * 128): + if chunk: + tmp.write(chunk) + return tmp.name + + @staticmethod + def _safe_truncate(text: str, limit: int) -> tuple[str, str]: + if len(text) <= limit: + return text, "" + + cut = text.rfind(" ", 0, limit) + + if cut == -1: + cut = limit + + return text[:cut].rstrip(), text[cut:].strip() + + def format_post(self, pet: AdoptablePet) -> Post: + text = ( + f"Meet {pet.name}! This adorable {pet.breed} {pet.species} " + f"is looking for a forever home in {pet.location}." + ) + + if pet.adoption_url: + text += f" Adopt {pet.name}: {pet.adoption_url}" + + if pet.description: + text += f"\n\n{pet.description}" + + city = "" + if pet.location != f"{CITY_NAME}, {CITY_STATE}": + city = pet.location.split(",")[0].capitalize() + + return Post( + text=text, + image_url=pet.image_url, + link=pet.adoption_url, + alt_text=( + f"Photo of {pet.name}, a {pet.breed} {pet.species} " + "available for adoption" + ), + tags=[ + "adoptdontshop", + "rescue", + city, + pet.species, + pet.breed.lower().replace(" ", ""), + ], + ) \ No newline at end of file diff --git a/tests/test_mastodon.py b/tests/test_mastodon.py index aadc9c8..f538930 100644 --- a/tests/test_mastodon.py +++ b/tests/test_mastodon.py @@ -1,27 +1,321 @@ -from abstractions import Post -from social_posters.mastodon import PosterMastodon, MASTODON_CHARACTER_LIMIT +""" +Testing includes: +1. Property-based testing (Hypothesis) +2. Unit tests +3. Manual visual inspection in Preview file + +Property-based tests: +- Generate randomized text/tag combinations. +- Nondeterministic testing unless specify seeds. +- Verify global invariants such as: + * captions never exceed Mastodon limits + * replies never exceed MAX_REPLIES + * full formatting pipeline matches runtime formatter + * no empty replies are produced + +Unit tests: +- Validate specific expected behaviors and edge cases. +- Examples include: + * truncation behavior + * capped thread handling + * reconstruction correctness + * no-tag formatting + * tag filtering + * word-safe truncation + * long text handling + +Manual visual inspection: +- See Preview file for details +""" + +from abstractions import Post, AdoptablePet +from hypothesis import given, strategies as st +from social_posters.mastodon import ( + CaptionThread, + MastodonPhase, + PosterMastodon, + MASTODON_CHARACTER_LIMIT, + MAX_REPLIES, +) + + +tag_strategy = st.lists( + st.one_of(st.text(min_size=0, max_size=20), st.none()), + max_size=10, +) + +text_strategy = st.text( + alphabet=st.characters(blacklist_categories=("Cs",)), + min_size=0, + max_size=5000, +) + +pet_strategy = st.builds( + AdoptablePet, + name=st.text( + alphabet=st.characters(blacklist_categories=("Cs",)), + min_size=1, + max_size=20, + ), + species=st.sampled_from( + [ + "Dog", + "Cat", + "Rabbit", + "Bird", + "Alien", + "Unknown", + ] + ), + breed=st.text( + alphabet=st.characters(blacklist_categories=("Cs",)), + min_size=1, + max_size=30, + ), + location=st.text( + alphabet=st.characters(blacklist_categories=("Cs",)), + min_size=1, + max_size=50, + ), + description=text_strategy, + adoption_url=st.one_of( + st.none(), + st.just("https://example.com/adopt"), + ), + image_url=st.just("https://example.com/image.jpg"), + age_string=st.one_of( + st.none(), + st.text( + alphabet=st.characters(blacklist_categories=("Cs",)), + min_size=1, + max_size=20, + ), + ), + sex=st.one_of( + st.none(), + st.sampled_from(["Male", "Female", "Unknown"]), + ), + size_group=st.one_of( + st.none(), + st.sampled_from(["Small", "Medium", "Large"]), + ), + pet_id=st.one_of( + st.none(), + st.text( + alphabet=st.characters(blacklist_categories=("Cs",)), + min_size=1, + max_size=20, + ), + ), +) + +def reconstruct_text(main_caption: str, replies: list[str]) -> str: + main_without_tags = main_caption.split("\n\n#")[0] + main_without_suffix = ( + main_without_tags + .replace("...", "") + .replace("\n\nMore details below ⬇️", "") + .strip() + ) + + return " ".join([main_without_suffix] + replies).strip() + +class TestMastodonCaptionProperties: + def setup_method(self): + self.poster = PosterMastodon.__new__(PosterMastodon) + + @given(text=text_strategy, tags=tag_strategy) + def test_all_parts_stay_under_mastodon_limit(self, text, tags): + post = Post(text=text, tags=tags) + + main_caption, replies = self.poster._format_caption_thread(post) + + assert len(main_caption) <= MASTODON_CHARACTER_LIMIT + assert all(len(reply) <= MASTODON_CHARACTER_LIMIT for reply in replies) + + @given(text=text_strategy, tags=tag_strategy) + def test_reply_count_is_never_over_cap(self, text, tags): + post = Post(text=text, tags=tags) + + _, replies = self.poster._format_caption_thread(post) + + assert len(replies) <= MAX_REPLIES + + @given(text=st.text(min_size=0, max_size=300)) + def test_no_empty_replies(self, text): + post = Post(text=text) + + _, replies = self.poster._format_caption_thread(post) + + assert all(reply for reply in replies) + + @given(text=text_strategy, tags=tag_strategy) + def test_reconstruction_preserves_uncapped_threads(self, text, tags): + post = Post(text=text, tags=tags) + + main_caption, replies = self.poster._format_caption_thread(post) + + if len(replies) < MAX_REPLIES: + reconstructed = reconstruct_text(main_caption, replies) + assert reconstructed == text.strip() + + @given(pet=pet_strategy) + def test_pipeline_matches_regular_formatter(self, pet): + post = self.poster.format_post(pet) + + main_caption, replies = self.poster._format_caption_thread(post) + pipeline = self.poster.build_formatting_pipeline(pet) + + assert pipeline.ok + assert isinstance(pipeline.value, CaptionThread) + + assert pipeline.value.main_caption == main_caption + assert pipeline.value.replies == replies + + @given(pet=pet_strategy) + def test_pipeline_records_expected_phases(self, pet): + pipeline = self.poster.build_formatting_pipeline(pet) + + phase_names = [phase.name for phase in pipeline.trace] + + assert phase_names == [ + MastodonPhase.PET, + MastodonPhase.POST, + MastodonPhase.PREPARED_CAPTION, + MastodonPhase.CAPTION_THREAD, + ] class TestMastodonCaption: def setup_method(self): self.poster = PosterMastodon.__new__(PosterMastodon) + + + def test_reply_count_is_capped(self): + post = Post(text="hello " * 5000) + + _, replies = self.poster._format_caption_thread(post) + + assert len(replies) <= MAX_REPLIES + + def test_last_reply_has_truncation_suffix_when_capped(self): + post = Post(text="hello " * 5000) + + _, replies = self.poster._format_caption_thread(post) + + assert len(replies) == MAX_REPLIES + assert replies[-1].endswith("...") + assert len(replies[-1]) <= MASTODON_CHARACTER_LIMIT + + def test_last_reply_has_no_truncation_suffix_when_not_capped(self): + post = Post(text="hello " * 300) + + _, replies = self.poster._format_caption_thread(post) + + assert replies + assert len(replies) < MAX_REPLIES + assert not replies[-1].endswith("...") + + def test_capped_thread_does_not_preserve_all_original_text(self): + original_text = "hello " * 5000 + post = Post(text=original_text) + + main_caption, replies = self.poster._format_caption_thread(post) + + reconstructed = reconstruct_text(main_caption, replies) + + assert len(replies) == MAX_REPLIES + assert reconstructed != original_text + assert replies[-1].endswith("...") + + def test_thread_preserves_original_text_content(self): + original_text = " ".join(f"word{i}" for i in range(300)) + post = Post(text=original_text, tags=["AdoptDontShop", "Boston"]) + + main_caption, replies = self.poster._format_caption_thread(post) + + reconstructed = reconstruct_text(main_caption, replies) + + assert reconstructed == original_text + + def test_thread_preserves_original_text_without_spaces(self): + original_text = "x" * 1000 + post = Post(text=original_text, tags=["AdoptDontShop", "Boston"]) + + main_caption, replies = self.poster._format_caption_thread(post) + + main_without_tags = main_caption.split("\n\n#")[0] + main_without_suffix = ( + main_without_tags + .replace("...", "") + .replace("\n\nMore details below ⬇️", "") + .strip() + ) + + reconstructed = main_without_suffix + "".join(replies) + + assert reconstructed == original_text + def test_no_tags(self): post = Post(text="Hello, world!") - assert self.poster._format_caption(post) == "Hello, world!" + + main_caption, replies = self.poster._format_caption_thread(post) + + assert main_caption == "Hello, world!" + assert replies == [] def test_with_tags(self): post = Post(text="Meet Poppy!", tags=["AdoptDontShop", "Boston"]) - assert self.poster._format_caption(post) == "Meet Poppy!\n\n#AdoptDontShop #Boston" - def test_caption_stays_under_limit(self): + main_caption, replies = self.poster._format_caption_thread(post) + + assert main_caption == "Meet Poppy!\n\n#AdoptDontShop #Boston" + assert replies == [] + + def test_spaced_long_text_creates_reply(self): + post = Post(text="x " * 1000, tags=["AdoptDontShop", "Boston"]) + + main_caption, replies = self.poster._format_caption_thread(post) + + assert len(main_caption) <= MASTODON_CHARACTER_LIMIT + assert main_caption.endswith("\n\n#AdoptDontShop #Boston") + assert "..." in main_caption + assert "More details below" in main_caption + assert replies + assert all(len(reply) <= MASTODON_CHARACTER_LIMIT for reply in replies) + + def test_unspaced_long_text_creates_reply(self): post = Post(text="x" * 1000, tags=["AdoptDontShop", "Boston"]) - caption = self.poster._format_caption(post) - assert len(caption) <= MASTODON_CHARACTER_LIMIT - assert caption.endswith("\n\n#AdoptDontShop #Boston") - assert "..." in caption + main_caption, replies = self.poster._format_caption_thread(post) + + assert len(main_caption) <= MASTODON_CHARACTER_LIMIT + assert main_caption.endswith("\n\n#AdoptDontShop #Boston") + assert "..." in main_caption + assert "More details below" in main_caption + assert replies + assert all(len(reply) <= MASTODON_CHARACTER_LIMIT for reply in replies) def test_empty_tags_are_ignored(self): post = Post(text="Meet Poppy!", tags=["AdoptDontShop", "", None, "Boston"]) - assert self.poster._format_caption(post) == "Meet Poppy!\n\n#AdoptDontShop #Boston" \ No newline at end of file + + main_caption, replies = self.poster._format_caption_thread(post) + + assert main_caption == "Meet Poppy!\n\n#AdoptDontShop #Boston" + assert replies == [] + + def test_long_text_without_tags_creates_replies(self): + post = Post(text="hello " * 300) + + main_caption, replies = self.poster._format_caption_thread(post) + + assert len(main_caption) <= MASTODON_CHARACTER_LIMIT + assert replies + assert all(len(reply) <= MASTODON_CHARACTER_LIMIT for reply in replies) + + def test_safe_truncate_does_not_split_words_when_possible(self): + kept, remaining = self.poster._safe_truncate("hello world again", 12) + + assert kept == "hello world" + assert remaining == "again" \ No newline at end of file diff --git a/utils/pipeline.py b/utils/pipeline.py new file mode 100644 index 0000000..3735f33 --- /dev/null +++ b/utils/pipeline.py @@ -0,0 +1,75 @@ +""" +A generic pipeline utility for building preview/debug traces. + +Each phase transforms the current pipeline value from one type to another: + + PipelineResult[T] + Callable[[T], U] -> PipelineResult[U] + +For example: + + AdoptablePet -> Post -> PreparedCaption -> CaptionThread + +The trace stores every intermediate phase value for preview/debug output, +while `value` always represents the latest pipeline result. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import StrEnum +from typing import Callable, Generic, TypeVar + + +T = TypeVar("T") +U = TypeVar("U") + + +@dataclass(frozen=True) +class Phase: + name: StrEnum + value: object + + +@dataclass +class PipelineResult(Generic[T]): + value: T | None + trace: list[Phase] = field(default_factory=list) + errors: list[Exception] = field(default_factory=list) + + @property + def ok(self) -> bool: + return not self.errors + + +def start_pipeline(name: StrEnum, value: T) -> PipelineResult[T]: + return PipelineResult( + value=value, + trace=[Phase(name, value)], + ) + + +def add_phase( + pipeline: PipelineResult[T], + name: StrEnum, + fn: Callable[[T], U], +) -> PipelineResult[U]: + if not pipeline.ok: + return PipelineResult( + value=None, + trace=pipeline.trace, + errors=pipeline.errors, + ) + + try: + assert pipeline.value is not None + new_value = fn(pipeline.value) + return PipelineResult( + value=new_value, + trace=pipeline.trace + [Phase(name, new_value)], + ) + except Exception as exc: + return PipelineResult( + value=None, + trace=pipeline.trace, + errors=pipeline.errors + [exc], + ) \ No newline at end of file diff --git a/utils/pipeline_preview.py b/utils/pipeline_preview.py new file mode 100644 index 0000000..aacd15d --- /dev/null +++ b/utils/pipeline_preview.py @@ -0,0 +1,39 @@ +""" +Pipeline preview renderer. + +Provides utilities for rendering selected stages of a pipeline +for debugging and inspection. +""" +from __future__ import annotations + +from dataclasses import dataclass +from enum import StrEnum +from typing import Callable + + +@dataclass(frozen=True) +class PreviewSection: + stage: StrEnum + title: str + render: Callable[[], None] + + +def print_section(title: str) -> None: + print("\n" + "=" * 60) + print(title) + print("=" * 60) + + +def should_render_section(selected: StrEnum, target: StrEnum, all_stage: StrEnum) -> bool: + return selected in (target, all_stage) + + +def render_sections( + sections: list[PreviewSection], + selected: StrEnum, + all_stage: StrEnum, +) -> None: + for section in sections: + if should_render_section(selected, section.stage, all_stage): + print_section(section.title) + section.render() \ No newline at end of file