-
-
Notifications
You must be signed in to change notification settings - Fork 845
Add preliminary support for ISO-8601 timestamps via date: archive match pattern (#8715) #8776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
282d70c
db46cdb
4363bf7
69e8608
5c20d8f
6f1bcd4
4060e94
e9a8c5f
470758d
df2d33d
870bf7a
461df75
9553c35
409733b
de03806
796981c
7b8a194
8e3f1e4
904853d
6032c4a
9cb5e5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import os | ||
| import re | ||
| from datetime import datetime, timezone, timedelta | ||
| from zoneinfo import ZoneInfo | ||
|
|
||
|
|
||
| def parse_timestamp(timestamp, tzinfo=timezone.utc): | ||
|
|
@@ -191,96 +192,136 @@ class DatePatternError(ValueError): | |
| """Raised when a date: archive pattern cannot be parsed.""" | ||
|
|
||
|
|
||
| def local(dt: datetime) -> datetime: | ||
| """Interpret naive dt as local time, attach timezone info from the local tz.""" | ||
| if dt.tzinfo is None: | ||
| dt = dt.astimezone() | ||
| return dt | ||
|
|
||
|
|
||
| def exact_predicate(dt: datetime): | ||
| """Return predicate matching archives whose ts equals dt (UTC).""" | ||
| dt_utc = local(dt).astimezone(timezone.utc) | ||
| dt_utc = dt.astimezone(timezone.utc) | ||
| return lambda ts: ts.astimezone(timezone.utc) == dt_utc | ||
|
|
||
|
|
||
| def interval_predicate(start: datetime, end: datetime): | ||
| start_utc = local(start).astimezone(timezone.utc) | ||
| end_utc = local(end).astimezone(timezone.utc) | ||
| start_utc = start.astimezone(timezone.utc) | ||
| end_utc = end.astimezone(timezone.utc) | ||
| return lambda ts: start_utc <= ts.astimezone(timezone.utc) < end_utc | ||
|
|
||
|
|
||
| def compile_date_pattern(expr: str): | ||
| def parse_tz(tzstr: str): | ||
| """ | ||
| Turn a date: expression into a predicate ts->bool. | ||
| Supports: | ||
| 1) Full ISO‑8601 timestamps with minute (and optional seconds/fraction) | ||
| 2) Hour-only: YYYY‑MM‑DDTHH -> interval of 1 hour | ||
| 3) Minute-only: YYYY‑MM‑DDTHH:MM -> interval of 1 minute | ||
| 4) YYYY, YYYY‑MM, YYYY‑MM‑DD -> day/month/year intervals | ||
| 5) Unix epoch (@123456789) -> exact match | ||
| Naive inputs are assumed local, then converted into UTC. | ||
| TODO: verify working for fractional seconds; add timezone support. | ||
| Parses a UTC offset like +08:00 or [Region/Name] into a timezone object. | ||
| """ | ||
| expr = expr.strip() | ||
| if not tzstr: | ||
| return None | ||
| if tzstr == "Z": | ||
| return timezone.utc | ||
| if tzstr[0] in "+-": | ||
| sign = 1 if tzstr[0] == "+" else -1 | ||
| try: | ||
| hh, mm = map(int, tzstr[1:].split(":")) | ||
| if not (0 <= mm < 60): | ||
| raise ValueError | ||
| except Exception: | ||
| raise DatePatternError("invalid UTC offset format") | ||
| # we do it this way so that, for example, -8:30 is | ||
| # -8 hours and -30 minutes, not -8 hours and +30 minutes | ||
| total_minutes = sign * (hh * 60 + mm) | ||
| # enforce ISO-8601 bounds (-12:00 to +14:00) | ||
| if not (-12 * 60 <= total_minutes <= 14 * 60): | ||
| raise DatePatternError("UTC offset outside ISO-8601 bounds") | ||
| return timezone(timedelta(minutes=total_minutes)) | ||
| # [Region/Name] | ||
| try: | ||
| return ZoneInfo(tzstr.strip("[]")) | ||
| except Exception: | ||
| raise DatePatternError("invalid timezone format") | ||
|
|
||
| # 1) Full timestamp (with fraction) | ||
| full_re = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+") | ||
| if full_re.match(expr): | ||
| dt = parse_local_timestamp(expr, tzinfo=timezone.utc) | ||
| return exact_predicate(dt) # no interval, since we have a fractional timestamp | ||
|
|
||
| # 2) Seconds-only | ||
| second_re = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$") | ||
| if second_re.match(expr): | ||
| start = parse_local_timestamp(expr, tzinfo=timezone.utc) | ||
| def compile_date_pattern(expr: str): | ||
| """ | ||
| Accepts any of: | ||
| YYYY | ||
| YYYY-MM | ||
| YYYY-MM-DD | ||
| YYYY-MM-DDTHH | ||
| YYYY-MM-DDTHH:MM | ||
| YYYY-MM-DDTHH:MM:SS | ||
| Unix epoch (@123456789) | ||
| …with an optional trailing timezone (Z or ±HH:MM or [Region/City]). | ||
| Returns a predicate that is True for timestamps in that interval. | ||
| """ | ||
| expr = expr.strip() | ||
| pattern = r""" | ||
| ^ | ||
| (?: | ||
| (?P<fraction>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+) # full timestamp with fraction | ||
| | (?P<second> \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) # no fraction | ||
| | (?P<minute> \d{4}-\d{2}-\d{2}T\d{2}:\d{2}) # minute precision | ||
| | (?P<hour> \d{4}-\d{2}-\d{2}T\d{2}) # hour precision | ||
| | (?P<day> \d{4}-\d{2}-\d{2}) # day precision | ||
| | (?P<month> \d{4}-\d{2}) # month precision | ||
| | (?P<year> \d{4}) # year precision | ||
| | @(?P<epoch>\d+) # unix epoch | ||
| ) | ||
| (?P<tz>Z|[+\-]\d{2}:\d{2}|\[[^\]]+\])? # optional timezone or [Region/City] | ||
| $ | ||
| """ | ||
| m = re.match(pattern, expr, re.VERBOSE) | ||
| if not m: | ||
| raise DatePatternError(f"unrecognised date: {expr!r}") | ||
|
|
||
| gd = m.groupdict() | ||
| tz = parse_tz(gd.get("tz")) # will be None if tzstr is empty -> local timezone | ||
|
|
||
| # unix epoch and user-specified timezone are mutually exclusive | ||
| if gd["epoch"] and tz is not None: | ||
| raise DatePatternError("unix‐epoch patterns (@123456789) are UTC and must not include a timezone suffix") | ||
|
|
||
| # 1) fractional‐second exact match | ||
| if gd["fraction"]: | ||
| ts = gd["fraction"] | ||
| dt = parse_timestamp(ts, tzinfo=tz) | ||
| return exact_predicate(dt) | ||
|
|
||
| # 2) second‐precision interval | ||
| if gd["second"]: | ||
| ts = gd["second"] | ||
| start = parse_timestamp(ts, tzinfo=tz) | ||
| # within one second | ||
| return interval_predicate(start, start + timedelta(seconds=1)) | ||
|
|
||
| # 3) Minute-only | ||
| minute_re = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}$") | ||
| if minute_re.match(expr): | ||
| start = parse_local_timestamp(expr + ":00", tzinfo=timezone.utc) | ||
| # 3) minute‐precision interval | ||
| if gd["minute"]: | ||
| ts = gd["minute"] + ":00" | ||
| start = parse_timestamp(ts, tzinfo=tz) | ||
| return interval_predicate(start, start + timedelta(minutes=1)) | ||
|
|
||
| # 4) Hour-only | ||
| hour_re = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}$") | ||
| if hour_re.match(expr): | ||
| start = parse_local_timestamp(expr + ":00:00", tzinfo=timezone.utc) | ||
| # 4) hour‐precision interval | ||
| if gd["hour"]: | ||
| ts = gd["hour"] + ":00:00" | ||
| start = parse_timestamp(ts, tzinfo=tz) | ||
| return interval_predicate(start, start + timedelta(hours=1)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe just use 1 regex with group names ( After a single
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe use re.VERBOSE so you can have a multi-line, commented regex for this. |
||
|
|
||
| # Unix epoch (@123456789) - Note: We don't support fractional seconds here, | ||
| # since Unix epochs are almost always whole numbers. | ||
| if expr.startswith("@"): | ||
| try: | ||
| epoch = int(expr[1:]) | ||
| except ValueError: | ||
| raise DatePatternError(f"invalid epoch: {expr!r}") | ||
| # 5a) day‐precision interval | ||
| if gd["day"]: | ||
| ts = gd["day"] + "T00:00:00" | ||
| start = parse_timestamp(ts, tzinfo=tz) | ||
| return interval_predicate(start, start + timedelta(days=1)) | ||
|
|
||
| # 5b) month‐precision interval | ||
| if gd["month"]: | ||
| ts = gd["month"] + "-01T00:00:00" | ||
| start = parse_timestamp(ts, tzinfo=tz) | ||
| return interval_predicate(start, offset_n_months(start, 1)) | ||
|
|
||
| # 5c) year‐precision interval | ||
| if gd["year"]: | ||
| ts = gd["year"] + "-01-01T00:00:00" | ||
| start = parse_timestamp(ts, tzinfo=tz) | ||
| return interval_predicate(start, offset_n_months(start, 12)) | ||
|
|
||
| # 6) unix‐epoch exact‐second match | ||
| if gd["epoch"]: | ||
| epoch = int(gd["epoch"]) | ||
| start = datetime.fromtimestamp(epoch, tz=timezone.utc) | ||
| # match within the second | ||
| return interval_predicate(start, start + timedelta(seconds=1)) | ||
|
|
||
| # Year/Year-month/Year-month-day | ||
| parts = expr.split("-") | ||
| try: | ||
| if len(parts) == 1: # YYYY | ||
| year = int(parts[0]) | ||
| start = datetime(year, 1, 1) | ||
| end = datetime(year + 1, 1, 1) | ||
|
|
||
| elif len(parts) == 2: # YYYY‑MM | ||
| year, month = map(int, parts) | ||
| start = datetime(year, month, 1) | ||
| end = offset_n_months(start, 1) | ||
|
|
||
| elif len(parts) == 3: # YYYY‑MM‑DD | ||
| year, month, day = map(int, parts) | ||
| start = datetime(year, month, day) | ||
| end = start + timedelta(days=1) | ||
|
|
||
| else: | ||
| raise DatePatternError(f"unrecognised date: {expr!r}") | ||
|
|
||
| except ValueError as e: | ||
| raise DatePatternError(str(e)) from None | ||
|
|
||
| return interval_predicate(start, end) | ||
| # should never get here | ||
| raise DatePatternError(f"unrecognised date: {expr!r}") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting approach.
What I meant was rather something like (simplified to cover only YYYY and YYYY-MM here as an example):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes this would've been much simpler. Will try to work on refactoring it to this approach tomorrow.