-
-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathmain.py
More file actions
337 lines (291 loc) · 16.3 KB
/
main.py
File metadata and controls
337 lines (291 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import os
os.environ.setdefault("KMP_DUPLICATE_LIB_OK", "TRUE")
import multiprocessing
import platform
import argparse
import sys
import logging
def _setup_bootstrap_logger():
"""Set up early bootstrap logger for startup phase before full logger initialization."""
# Get git info for bootstrap logging with improved error handling
try:
import subprocess
import os
# Increase timeout and add better error handling
branch = 'unknown'
commit = 'unknown'
try:
# Try to get branch name
branch_result = subprocess.run(['git', 'branch', '--show-current'],
capture_output=True, text=True, timeout=5,
cwd=os.getcwd())
if branch_result.returncode == 0 and branch_result.stdout.strip():
branch = branch_result.stdout.strip()
else:
# Debug: Log why branch detection failed
if os.environ.get('FUNGEN_DEBUG_GIT'):
print(f"DEBUG: Branch detection failed. Return code: {branch_result.returncode}")
print(f"DEBUG: Stdout: '{branch_result.stdout}'")
print(f"DEBUG: Stderr: '{branch_result.stderr}'")
print(f"DEBUG: Working directory: {os.getcwd()}")
except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as e:
if os.environ.get('FUNGEN_DEBUG_GIT'):
print(f"DEBUG: Branch detection exception: {type(e).__name__}: {e}")
pass
try:
# Try to get commit hash
commit_result = subprocess.run(['git', 'rev-parse', '--short', 'HEAD'],
capture_output=True, text=True, timeout=5,
cwd=os.getcwd())
if commit_result.returncode == 0 and commit_result.stdout.strip():
commit = commit_result.stdout.strip()
else:
# Debug: Log why commit detection failed
if os.environ.get('FUNGEN_DEBUG_GIT'):
print(f"DEBUG: Commit detection failed. Return code: {commit_result.returncode}")
print(f"DEBUG: Stdout: '{commit_result.stdout}'")
print(f"DEBUG: Stderr: '{commit_result.stderr}'")
except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as e:
if os.environ.get('FUNGEN_DEBUG_GIT'):
print(f"DEBUG: Commit detection exception: {type(e).__name__}: {e}")
pass
git_info = f"{branch}@{commit}"
except Exception:
git_info = "nogit@unknown"
# Set up a minimal colored console handler for startup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Clear any existing handlers
if logger.hasHandlers():
logger.handlers.clear()
# Create colored formatter for bootstrap phase
class BootstrapColoredFormatter(logging.Formatter):
GREY = "\x1b[90m"
GREEN = "\x1b[32m"
YELLOW = "\x1b[33m"
RED = "\x1b[31m"
BOLD_RED = "\x1b[31;1m"
RESET = "\x1b[0m"
format_base = f"[{git_info}] - %(levelname)-8s - %(message)s"
FORMATS = {
logging.DEBUG: GREY + format_base + RESET,
logging.INFO: GREEN + format_base + RESET,
logging.WARNING: YELLOW + format_base + RESET,
logging.ERROR: RED + format_base + RESET,
logging.CRITICAL: BOLD_RED + format_base + RESET
}
# Pre-build formatters to avoid creating new objects on every log call
_FORMATTERS = {level: logging.Formatter(fmt) for level, fmt in FORMATS.items()}
def format(self, record):
formatter = self._FORMATTERS.get(record.levelno)
if formatter:
return formatter.format(record)
return super().format(record)
# Add console handler with bootstrap formatter
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(BootstrapColoredFormatter())
logger.addHandler(console_handler)
def run_gui(video_path=None):
"""Initializes and runs the graphical user interface."""
from application.logic.app_logic import ApplicationLogic
from application.gui_components import GUI, show_splash_during_init
# Show splash screen during ApplicationLogic initialization
def init_app_logic():
return ApplicationLogic(is_cli=False)
core_app = show_splash_during_init(init_app_logic)
gui = GUI(app_logic=core_app)
core_app.gui_instance = gui
# Open video after GUI init if path was provided via CLI
if video_path:
core_app.file_manager.open_video_from_path(video_path)
gui.run()
def run_cli(args):
"""Runs the application in command-line interface mode."""
import warnings
warnings.filterwarnings("ignore", message=".*GLFW.*not initialized.*")
from application.logic.app_logic import ApplicationLogic
logger = logging.getLogger(__name__)
logger.info("--- FunGen CLI Mode ---")
core_app = ApplicationLogic(is_cli=True)
if args.hwaccel:
core_app.hardware_acceleration_method = args.hwaccel
logger.info(f"CLI hwaccel override: {args.hwaccel}")
# This new method in ApplicationLogic will handle the CLI workflow
core_app.run_cli(args)
logger.info("--- CLI Task Finished ---")
def main():
"""
Main function to run the application.
This function handles dependency checking, argument parsing, and starts either the GUI or CLI.
"""
# Step 1: Initialize bootstrap logger for early startup logging
_setup_bootstrap_logger()
logger = logging.getLogger(__name__)
# Step 2: Perform dependency check before importing anything else
# First, try to install the most basic bootstrap dependencies if they're missing
import subprocess
import sys
import importlib
# Check if we have the required bootstrap packages
bootstrap_packages = ['packaging', 'requests', 'tqdm', 'send2trash']
missing_bootstrap = []
for package in bootstrap_packages:
try:
importlib.import_module(package)
except ImportError:
missing_bootstrap.append(package)
if missing_bootstrap:
logger.warning(f"Bootstrap dependencies missing: {', '.join(missing_bootstrap)}")
logger.info("Installing bootstrap dependencies...")
try:
result = subprocess.run([sys.executable, "-m", "pip", "install"] + missing_bootstrap,
capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Failed to install some bootstrap dependencies: {result.stderr}")
logger.info("Attempting to install from requirements/core.requirements.txt...")
result = subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements/core.requirements.txt"],
capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Failed to install core requirements: {result.stderr}")
logger.error("Please manually install the requirements using: pip install -r requirements/core.requirements.txt")
sys.exit(1)
except Exception as install_error:
logger.error(f"Failed to install bootstrap dependencies: {install_error}")
logger.error("Please manually install the requirements using: pip install -r requirements/core.requirements.txt")
sys.exit(1)
# Now try to import and run the dependency checker
try:
from application.utils.dependency_checker import check_and_install_dependencies
check_and_install_dependencies()
except ImportError as e:
logger.error(f"Failed to import dependency checker after bootstrap: {e}")
logger.error("Please ensure the file 'application/utils/dependency_checker.py' exists.")
sys.exit(1)
except Exception as e:
logger.error(f"An unexpected error occurred during dependency check: {e}")
sys.exit(1)
# Step 3: Set platform-specific multiprocessing behavior
if platform.system() != "Windows":
multiprocessing.set_start_method('spawn', force=True)
else:
# On Windows, ensure proper console window management for multiprocessing
multiprocessing.set_start_method('spawn', force=True)
# Note: Windows uses 'spawn' by default, but we ensure it's set explicitly
# This helps maintain consistent behavior across different Python versions
# Windows-specific: Suppress ConnectionResetError in asyncio
# This is a known Windows issue where the remote host forcibly closes connections
# https://github.com/python/cpython/issues/83413
import asyncio
def silence_asyncio_windows_errors(loop, context):
"""Suppress ConnectionResetError on Windows (WinError 10054)"""
exception = context.get('exception')
if isinstance(exception, ConnectionResetError):
# This is normal when a client disconnects during streaming
logger.debug(f"Client disconnected (ConnectionResetError suppressed): {context.get('message', '')}")
return
# For other exceptions, use the default handler
loop.default_exception_handler(context)
# Set the custom exception handler for the current event loop
try:
loop = asyncio.get_event_loop()
loop.set_exception_handler(silence_asyncio_windows_errors)
except RuntimeError:
# No event loop yet, it will be created later
pass
# Step 4: Parse command-line arguments
from config.constants import APP_VERSION
parser = argparse.ArgumentParser(description="FunGen - Automatic Funscript Generation and Processing")
parser.add_argument('--version', action='version', version=f'FunGen {APP_VERSION}')
parser.add_argument('input_path', nargs='?', default=None, help='Path to a video file, folder of videos, or funscript file. If omitted, GUI will start.')
parser.add_argument('--open', metavar='VIDEO', default=None, help='Open the GUI with a video file pre-loaded. Example: python main.py --open video.mp4')
# Output control
parser.add_argument('--output', '-o', metavar='DIR', default=None, help='Override output directory for this run.')
verbosity = parser.add_mutually_exclusive_group()
verbosity.add_argument('--quiet', '-q', action='store_true', help='Suppress info messages, show only warnings and errors.')
verbosity.add_argument('--verbose', '-v', action='store_true', help='Show debug-level messages.')
# Funscript filtering mode
parser.add_argument('--funscript-mode', action='store_true', help='Process funscript files instead of videos. Apply filters to existing funscripts.')
parser.add_argument('--filter', choices=['ultimate-autotune', 'rdp-simplify', 'savgol-filter', 'speed-limiter', 'anti-jerk', 'amplify', 'clamp', 'invert', 'keyframe'],
help='Filter to apply to funscript(s). Only works with --funscript-mode.')
# Dynamic mode selection - get available modes from discovery system
try:
from config.tracker_discovery import get_tracker_discovery
discovery = get_tracker_discovery()
available_modes = discovery.get_supported_cli_modes()
batch_modes = [info.cli_aliases[0] for info in discovery.get_batch_compatible_trackers() if info.cli_aliases]
default_mode = batch_modes[0] if batch_modes else '3-stage'
parser.add_argument('--mode', choices=available_modes, default=default_mode,
help='The processing mode to use for analysis. Only works with video processing.')
except Exception:
# Fallback if discovery system fails
parser.add_argument('--mode', default='3-stage', help='Processing mode (discovery system unavailable)')
parser.add_argument('--list-modes', action='store_true', help='List available processing modes and exit.')
parser.add_argument('--od-mode', choices=['current', 'legacy'], default='current', help='Oscillation detector mode to use in Stage 3 (current=experimental, legacy=f5ae40f).')
parser.add_argument('--overwrite', action='store_true', help='Force processing and overwrite existing funscripts. Default is to skip videos with existing funscripts.')
parser.add_argument('--no-autotune', action='store_false', dest='autotune', help='Disable applying the default Ultimate Autotune settings after generation.')
parser.add_argument('--no-copy', action='store_false', dest='copy', help='Do not save a copy of the final funscript next to the video file (will save to output folder only).')
parser.add_argument('--generate-roll', action='store_true', help='Generate secondary axis (.roll.funscript) file for supported multi-axis devices.')
parser.add_argument('--save-preprocessed', action='store_true', help='Keep preprocessed (resized/unwarped) video per file. Uses significant disk space.')
parser.add_argument('--recursive', '-r', action='store_true', help='If input_path is a folder, process it recursively.')
parser.add_argument('--hwaccel', metavar='METHOD', default=None, help='Override hardware acceleration method for this run (e.g. cuda, qsv, auto, none).')
parser.add_argument('--watch', metavar='FOLDER', help='Watch folder for new videos (requires patreon_features add-on).')
parser.add_argument('--pipeline', metavar='PRESET', default=None, help='Apply a plugin pipeline preset after generation (e.g. "Ultimate Autotune", "Light Polish").')
args = parser.parse_args()
# Apply verbosity settings
if args.quiet:
logging.getLogger().setLevel(logging.WARNING)
elif args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# --list-modes: print available modes and exit
if args.list_modes:
try:
from config.tracker_discovery import get_tracker_discovery
disc = get_tracker_discovery()
print("Available processing modes:")
for name, info in disc.get_all_trackers().items():
aliases = ", ".join(info.cli_aliases) if info.cli_aliases else "(no CLI alias)"
batch = "batch" if info.supports_batch else "live-only"
ver = f"v{info.version}" if info.version else ""
print(f" {info.display_name:<35s} {ver:<8s} cli: {aliases:<25s} [{batch}] ({info.folder_name})")
except Exception as e:
logger.error(f"Could not list modes: {e}")
sys.exit(0)
# Step 5: Validate arguments and start the appropriate interface
if args.watch:
# Headless watched-folder mode (supporter feature)
try:
from application.utils.feature_detection import is_feature_available
if not is_feature_available("patreon_features"):
parser.error("--watch requires the patreon_features add-on")
from patreon_features.batch.watched_folder import WatchedFolderProcessor
from patreon_features.batch.batch_queue import BatchQueue
except ImportError as e:
parser.error(f"--watch requires patreon_features: {e}")
queue = BatchQueue()
watcher = WatchedFolderProcessor(on_new_video=lambda p: queue.add(p))
watcher.start_watching(args.watch, recursive=True)
logger.info(f"Watching folder: {args.watch} (Ctrl-C to stop)")
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
watcher.stop_watching()
logger.info("Stopped watching")
elif args.input_path:
# Validate funscript mode arguments
if args.funscript_mode and not args.filter:
parser.error("--funscript-mode requires --filter to be specified")
if args.filter and not args.funscript_mode:
parser.error("--filter can only be used with --funscript-mode")
# Apply output directory override
if args.output:
from application.classes.settings_manager import AppSettings
settings = AppSettings()
settings.set("output_folder_path", args.output)
run_cli(args)
else:
run_gui(video_path=args.open)
if __name__ == "__main__":
main()