Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mloader/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def validate_ids(ctx: click.Context, param, value):
"--raw",
"-r",
is_flag=True,
default=False,
default=True,
show_default=True,
help="Save raw images",
envvar="MLOADER_RAW",
Expand Down
75 changes: 73 additions & 2 deletions mloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,78 @@ def __init__(
self.split = split
self._api_url = "https://jumpg-webapi.tokyo-cdn.com"
self.session = Session()
with open("token.txt", "r", encoding="utf-8") as f:
current_token = f.read().strip()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; "
"rv:72.0) Gecko/20100101 Firefox/72.0"
"rv:72.0) Gecko/20100101 Firefox/72.0",
"Session-Token": current_token,
}
)

@staticmethod
def _parse_api_error(data: bytes) -> Optional[str]:
"""Extract a human-readable error from a failed API response.

The API returns errors in protobuf field 2 of Response (not in the
'success' field 1 defined in the schema), so they are silently ignored
by the generated parser. This method decodes them manually.
"""
def read_varint(buf, pos):
result = shift = 0
while pos < len(buf):
b = buf[pos]; pos += 1
result |= (b & 0x7f) << shift
shift += 7
if not (b & 0x80):
break
return result, pos

def read_ldelim(buf, pos):
length, pos = read_varint(buf, pos)
return buf[pos:pos + length], pos + length

try:
pos = 0
while pos < len(data):
tag, pos = read_varint(data, pos)
wire_type = tag & 0x7
field_num = tag >> 3
if wire_type == 2:
chunk, pos = read_ldelim(data, pos)
if field_num == 2: # failure field
# Scan inner fields for a message containing text
ipos = 0
while ipos < len(chunk):
itag, ipos = read_varint(chunk, ipos)
if itag & 0x7 == 2:
inner, ipos = read_ldelim(chunk, ipos)
# Parse title (field 1) and body (field 2)
title = body = ""
dpos = 0
while dpos < len(inner):
dtag, dpos = read_varint(inner, dpos)
if dtag & 0x7 == 2:
val, dpos = read_ldelim(inner, dpos)
if dtag >> 3 == 1:
title = val.decode("utf-8", errors="replace")
elif dtag >> 3 == 2:
body = val.decode("utf-8", errors="replace")
else:
break
if title:
return f"{title}: {body}".rstrip(": ") if body else title
else:
break
elif wire_type == 0:
_, pos = read_varint(data, pos)
else:
break
except Exception:
pass
return None

def _decrypt_image(self, url: str, encryption_hex: str) -> bytearray:
resp = self.session.get(url)
data = bytearray(resp.content)
Expand All @@ -61,7 +126,13 @@ def _load_pages(self, chapter_id: Union[str, int]) -> MangaViewer:
"img_quality": self.quality,
},
)
return Response.FromString(resp.content).success.manga_viewer
viewer = Response.FromString(resp.content).success.manga_viewer
if not viewer.pages and not viewer.chapter_id:
error = self._parse_api_error(resp.content)
raise RuntimeError(
error or f"Empty response for chapter {chapter_id}"
)
return viewer

@lru_cache(None)
def _get_title_details(self, title_id: Union[str, int]) -> TitleDetailView:
Expand Down