feat(watcher): Add periodic scanning fallback mechanism
- Added scan_interval parameter to FolderWatcher class - Implemented _periodic_scan background thread - Fixed on_modified event handler to process files - Improved logging for debugging filesystem events - Optimized _process_existing to avoid redundant processing
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import time
|
||||
import threading
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from pathlib import Path
|
||||
@@ -9,11 +10,13 @@ class DropFolderHandler(FileSystemEventHandler):
|
||||
self.processing = set() # Prevent duplicate processing
|
||||
|
||||
def on_created(self, event):
|
||||
self._handle_path(event.src_path)
|
||||
if not event.is_directory:
|
||||
self._handle_path(event.src_path)
|
||||
|
||||
def on_modified(self, event):
|
||||
# Some systems trigger created, some modified
|
||||
pass
|
||||
# Handle modified events for files (some systems trigger modified instead of created)
|
||||
if not event.is_directory:
|
||||
self._handle_path(event.src_path)
|
||||
|
||||
def _handle_path(self, path):
|
||||
if path in self.processing:
|
||||
@@ -22,6 +25,7 @@ class DropFolderHandler(FileSystemEventHandler):
|
||||
self.processing.add(path)
|
||||
try:
|
||||
file_path = Path(path)
|
||||
print(f"File system event detected: {path}")
|
||||
|
||||
# Wait a moment for write to complete
|
||||
time.sleep(1)
|
||||
@@ -29,20 +33,28 @@ class DropFolderHandler(FileSystemEventHandler):
|
||||
if file_path.is_dir():
|
||||
# Process all videos in subfolder
|
||||
videos = self.processor.find_video_files(file_path)
|
||||
print(f"Processing directory: {path}, found {len(videos)} video(s)")
|
||||
for video in videos:
|
||||
self.processor.process_file(video)
|
||||
else:
|
||||
print(f"Processing file: {path}")
|
||||
self.processor.process_file(file_path)
|
||||
except Exception as e:
|
||||
print(f"Error processing {path}: {e}")
|
||||
finally:
|
||||
self.processing.discard(path)
|
||||
|
||||
class FolderWatcher:
|
||||
def __init__(self, drop_folder, processor):
|
||||
def __init__(self, drop_folder, processor, scan_interval=30):
|
||||
self.drop_folder = Path(drop_folder)
|
||||
self.processor = processor
|
||||
self.scan_interval = scan_interval
|
||||
self.observer = Observer()
|
||||
self._stop_event = threading.Event()
|
||||
self._scan_thread = None
|
||||
|
||||
def start(self):
|
||||
# Start watchdog observer
|
||||
handler = DropFolderHandler(self.processor)
|
||||
self.observer.schedule(handler, str(self.drop_folder), recursive=True)
|
||||
self.observer.start()
|
||||
@@ -51,18 +63,42 @@ class FolderWatcher:
|
||||
# Process existing files on startup
|
||||
self._process_existing()
|
||||
|
||||
# Start periodic scanning thread
|
||||
self._stop_event.clear()
|
||||
self._scan_thread = threading.Thread(target=self._periodic_scan, daemon=True)
|
||||
self._scan_thread.start()
|
||||
print(f"Started periodic scanning every {self.scan_interval} seconds")
|
||||
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
self.stop()
|
||||
|
||||
def _periodic_scan(self):
|
||||
"""Periodically scan drop folder as fallback for missed filesystem events"""
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
self._process_existing()
|
||||
except Exception as e:
|
||||
print(f"Error during periodic scan: {e}")
|
||||
self._stop_event.wait(self.scan_interval)
|
||||
|
||||
def _process_existing(self):
|
||||
"""Process files already in drop folder on startup"""
|
||||
"""Process files already in drop folder"""
|
||||
videos = self.processor.find_video_files(self.drop_folder)
|
||||
for video in videos:
|
||||
self.processor.process_file(video)
|
||||
if videos:
|
||||
print(f"Periodic scan found {len(videos)} video(s) in drop folder")
|
||||
# Process all videos at once using process_drop_folder for efficient duplicate handling
|
||||
processed = self.processor.process_drop_folder(self.drop_folder)
|
||||
if processed > 0:
|
||||
print(f"Periodic scan processed {processed} file(s)")
|
||||
else:
|
||||
print("Periodic scan: no videos found in drop folder")
|
||||
|
||||
def stop(self):
|
||||
self._stop_event.set()
|
||||
if self._scan_thread:
|
||||
self._scan_thread.join(timeout=5)
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
2
main.py
2
main.py
@@ -32,7 +32,7 @@ def main():
|
||||
print("Web UI started at http://localhost:5000")
|
||||
|
||||
# Start file watcher (blocking)
|
||||
watcher = FolderWatcher(config['general']['drop_folder'], processor)
|
||||
watcher = FolderWatcher(config['general']['drop_folder'], processor, scan_interval=config['general']['scan_interval'])
|
||||
watcher.start()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Reference in New Issue
Block a user