The error occurs when trying to process images with Topaz Photo AI (tpai.exe). The key issues are:
The error code 4294967295 (0xFFFFFFFF in hex) typically indicates a general failure in Windows applications.
The error message "No valid image file passed" suggests the program isn't receiving the input file correctly.
The code is trying to process files in a directory but may not be handling paths correctly.
Here's the fixed version of the code:
import os
import subprocess
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def process_single_image(
input_filepath: str,
output_dir: str,
topaz_photo_ai_path: str
) -> tuple[str, bool, str]:
"""
Process a single image file.
Returns: (filename, success status, message)
"""
filename = os.path.basename(input_filepath)
print(f"Processing: {filename}")
# Ensure the input file exists
if not os.path.exists(input_filepath):
return filename, False, f"Error: Input file not found: {input_filepath}"
command = [
topaz_photo_ai_path,
input_filepath,
"--output",
output_dir,
"--autopilot" # Added autopilot mode for better handling
]
try:
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
encoding='utf-8',
errors='replace'
)
success_message = f"Successfully processed: {filename}"
if process.stdout:
success_message += f"\nTopaz output: {process.stdout}"
if process.stderr:
success_message += f"\nTopaz warnings: {process.stderr}"
return filename, True, success_message
except subprocess.CalledError as e:
error_message = f"Error processing {filename}: {e}\nCommand: {' '.join(e.cmd)}\nOutput: {e.stdout}\nError: {e.stderr}"
return filename, False, error_message
except Exception as e:
return filename, False, f"Unexpected error processing {filename}: {e}"
def optimize_images_with_topaz_photo_ai_threaded(
image_dir: str,
topaz_photo_ai_path: str = r"C:\Program Files\Topaz Labs LLC\Topaz Photo AI\tpai.exe",
max_workers: int = 4
):
"""
Optimize JPG images in a directory using Topaz Photo AI.
Args:
image_dir: Directory containing images to optimize
topaz_photo_ai_path: Path to Topaz Photo AI executable
max_workers: Maximum number of concurrent processes
"""
# Validate executable path
if not os.path.exists(topaz_photo_ai_path):
print(f"Error: Topaz Photo AI executable not found at {topaz_photo_ai_path}")
return
# Validate input directory
if not os.path.isdir(image_dir):
print(f"Error: Invalid directory: {image_dir}")
return
# Create temp directory
temp_output_dir = os.path.join(image_dir, "temp_optimized_images")
os.makedirs(temp_output_dir, exist_ok=True)
print(f"Created temp directory: {temp_output_dir}")
# Get JPG files
jpg_files = [
f for f in os.listdir(image_dir)
if f.lower().endswith(('.jpg', '.jpeg')) and
os.path.isfile(os.path.join(image_dir, f))
]
if not jpg_files:
print(f"No JPG images found in {image_dir}")
shutil.rmtree(temp_output_dir)
return
print(f"Found {len(jpg_files)} JPG images to process")
start_time = time.time()
# Process images
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
process_single_image,
os.path.join(image_dir, filename),
temp_output_dir,
topaz_photo_ai_path
): filename for filename in jpg_files
}
for future in as_completed(futures):
filename = futures[future]
try:
_, success, message = future.result()
if success:
print(f"\n--- Success: {filename} ---\n{message}")
else:
print(f"\n--- Error: {filename} ---\n{message}")
except Exception as e:
print(f"Error processing {filename}: {e}")
# Move optimized files
print("\nMoving optimized files...")
for filename in os.listdir(temp_output_dir):
src = os.path.join(temp_output_dir, filename)
dst = os.path.join(image_dir, filename)
if os.path.isfile(src):
try:
shutil.move(src, dst)
print(f"Replaced: {dst}")
except Exception as e:
print(f"Error moving {src}: {e}")
# Clean up
try:
shutil.rmtree(temp_output_dir)
print(f"Removed temp directory: {temp_output_dir}")
except Exception as e:
print(f"Error removing temp directory: {e}")
print(f"\nProcessing completed in {time.time() - start_time:.2f} seconds")
if __name__ == "__main__":
image_dir = r"D:\BaiduNetdiskDownload"
topaz_path = r"C:\Program Files\Topaz Labs LLC\Topaz Photo AI\tpai.exe"
# Use logical CPU count for max workers
cpu_count = os.cpu_count() or 4
max_workers = min(cpu_count * 2, 16) # Cap at 16 to avoid overloading
optimize_images_with_topaz_photo_ai_threaded(
image_dir,
topaz_path,
max_workers
)
import os
import subprocess
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def process_single_image(
input_filepath: str,
output_dir: str,
topaz_photo_ai_path: str
) -> tuple[str, bool, str]:
"""
Process a single image file.
Returns: (filename, success status, message)
"""
filename = os.path.basename(input_filepath)
print(f"Processing: {filename}")
# Ensure the input file exists
if not os.path.exists(input_filepath):
return filename, False, f"Error: Input file not found: {input_filepath}"
command = [
topaz_photo_ai_path,
input_filepath,
"--output",
output_dir,
"--autopilot" # Added autopilot mode for better handling
]
try:
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
encoding='utf-8',
errors='replace'
)
success_message = f"Successfully processed: {filename}"
if process.stdout:
success_message += f"\nTopaz output: {process.stdout}"
if process.stderr:
success_message += f"\nTopaz warnings: {process.stderr}"
return filename, True, success_message
except subprocess.CalledError as e:
error_message = f"Error processing {filename}: {e}\nCommand: {' '.join(e.cmd)}\nOutput: {e.stdout}\nError: {e.stderr}"
return filename, False, error_message
except Exception as e:
return filename, False, f"Unexpected error processing {filename}: {e}"
def optimize_images_with_topaz_photo_ai_threaded(
image_dir: str,
topaz_photo_ai_path: str = r"C:\Program Files\Topaz Labs LLC\Topaz Photo AI\tpai.exe",
max_workers: int = 4
):
"""
Optimize JPG images in a directory using Topaz Photo AI.
Args:
image_dir: Directory containing images to optimize
topaz_photo_ai_path: Path to Topaz Photo AI executable
max_workers: Maximum number of concurrent processes
"""
# Validate executable path
if not os.path.exists(topaz_photo_ai_path):
print(f"Error: Topaz Photo AI executable not found at {topaz_photo_ai_path}")
return
# Validate input directory
if not os.path.isdir(image_dir):
print(f"Error: Invalid directory: {image_dir}")
return
# Create temp directory
temp_output_dir = os.path.join(image_dir, "temp_optimized_images")
os.makedirs(temp_output_dir, exist_ok=True)
print(f"Created temp directory: {temp_output_dir}")
# Get JPG files
jpg_files = [
f for f in os.listdir(image_dir)
if f.lower().endswith(('.jpg', '.jpeg')) and
os.path.isfile(os.path.join(image_dir, f))
]
if not jpg_files:
print(f"No JPG images found in {image_dir}")
shutil.rmtree(temp_output_dir)
return
print(f"Found {len(jpg_files)} JPG images to process")
start_time = time.time()
# Process images
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
process_single_image,
os.path.join(image_dir, filename),
temp_output_dir,
topaz_photo_ai_path
): filename for filename in jpg_files
}
for future in as_completed(futures):
filename = futures[future]
try:
_, success, message = future.result()
if success:
print(f"\n--- Success: {filename} ---\n{message}")
else:
print(f"\n--- Error: {filename} ---\n{message}")
except Exception as e:
print(f"Error processing {filename}: {e}")
# Move optimized files
print("\nMoving optimized files...")
for filename in os.listdir(temp_output_dir):
src = os.path.join(temp_output_dir, filename)
dst = os.path.join(image_dir, filename)
if os.path.isfile(src):
try:
shutil.move(src, dst)
print(f"Replaced: {dst}")
except Exception as e:
print(f"Error moving {src}: {e}")
# Clean up
try:
shutil.rmtree(temp_output_dir)
print(f"Removed temp directory: {temp_output_dir}")
except Exception as e:
print(f"Error removing temp directory: {e}")
print(f"\nProcessing completed in {time.time() - start_time:.2f} seconds")
if __name__ == "__main__":
image_dir = r"D:\BaiduNetdiskDownload"
topaz_path = r"C:\Program Files\Topaz Labs LLC\Topaz Photo AI\tpai.exe"
# Use logical CPU count for max workers
cpu_count = os.cpu_count() or 4
max_workers = min(cpu_count * 2, 16) # Cap at 16 to avoid overloading
optimize_images_with_topaz_photo_ai_threaded(
image_dir,
topaz_path,
max_workers
)
Key improvements:
--autopilot flag to Topaz command for better handlingTo use this script:
topaz_image_processor.pyThe script will now handle errors more gracefully and should work with Topaz Photo AI 4.0.1. If you still get errors, you may need to check: