11import json
22import os
3+ import time
34from dataclasses import dataclass
45from functools import wraps
56from typing import ClassVar
@@ -23,7 +24,8 @@ def opt_oidc(f):
2324 @click .option ("--username" , help = "OIDC username" )
2425 @click .option ("--password" , help = "OIDC password" )
2526 @click .option ("--connector-id" , "connector_id" , help = "OIDC token exchange connector id (Dex specific)" )
26- @click .option ("--callback-port" ,
27+ @click .option (
28+ "--callback-port" ,
2729 "callback_port" ,
2830 type = click .IntRange (0 , 65535 ),
2931 default = None ,
@@ -93,7 +95,7 @@ async def authorization_code_grant(self, callback_port: int | None = None):
9395 elif env_value .isdigit () and int (env_value ) <= 65535 :
9496 port = int (env_value )
9597 else :
96- raise click .ClickException (f" Invalid { JMP_OIDC_CALLBACK_PORT } \ "{ env_value } \ " : must be a valid port" )
98+ raise click .ClickException (f' Invalid { JMP_OIDC_CALLBACK_PORT } "{ env_value } ": must be a valid port' )
9799
98100 tx , rx = create_memory_object_stream ()
99101
@@ -133,8 +135,50 @@ async def callback(request):
133135
134136
135137def decode_jwt (token : str ):
136- return json .loads (extract_compact (token .encode ()).payload )
138+ try :
139+ return json .loads (extract_compact (token .encode ()).payload )
140+ except (ValueError , KeyError , TypeError ) as e :
141+ raise ValueError (f"Invalid JWT format: { e } " ) from e
137142
138143
139144def decode_jwt_issuer (token : str ):
140145 return decode_jwt (token ).get ("iss" )
146+
147+
148+ def get_token_expiry (token : str ) -> int | None :
149+ """Get token expiry timestamp (Unix epoch seconds) from JWT.
150+
151+ Returns None if token doesn't have an exp claim.
152+ """
153+ return decode_jwt (token ).get ("exp" )
154+
155+
156+ def get_token_remaining_seconds (token : str ) -> float | None :
157+ """Get seconds remaining until token expires.
158+
159+ Returns:
160+ Positive value if token is still valid
161+ Negative value if token is expired (magnitude = how long ago)
162+ None if token doesn't have an exp claim
163+ """
164+ exp = get_token_expiry (token )
165+ if exp is None :
166+ return None
167+ return exp - time .time ()
168+
169+
170+ def is_token_expired (token : str , buffer_seconds : int = 0 ) -> bool :
171+ """Check if token is expired or will expire within buffer_seconds.
172+
173+ Args:
174+ token: JWT token string
175+ buffer_seconds: Consider expired if less than this many seconds remain
176+
177+ Returns:
178+ True if token is expired or will expire within buffer
179+ False if token is still valid (or has no exp claim)
180+ """
181+ remaining = get_token_remaining_seconds (token )
182+ if remaining is None :
183+ return False
184+ return remaining < buffer_seconds
0 commit comments