Release on 09.11.24
This commit is contained in:
8
tests-unit/README.md
Normal file
8
tests-unit/README.md
Normal file
@@ -0,0 +1,8 @@
|
||||
# Pytest Unit Tests
|
||||
|
||||
## Install test dependencies
|
||||
|
||||
`pip install -r tests-unit/requirements.txt`
|
||||
|
||||
## Run tests
|
||||
`pytest tests-unit/`
|
||||
0
tests-unit/app_test/__init__.py
Normal file
0
tests-unit/app_test/__init__.py
Normal file
130
tests-unit/app_test/frontend_manager_test.py
Normal file
130
tests-unit/app_test/frontend_manager_test.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import argparse
|
||||
import pytest
|
||||
from requests.exceptions import HTTPError
|
||||
from unittest.mock import patch
|
||||
|
||||
from app.frontend_management import (
|
||||
FrontendManager,
|
||||
FrontEndProvider,
|
||||
Release,
|
||||
)
|
||||
from comfy.cli_args import DEFAULT_VERSION_STRING
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_releases():
|
||||
return [
|
||||
Release(
|
||||
id=1,
|
||||
tag_name="1.0.0",
|
||||
name="Release 1.0.0",
|
||||
prerelease=False,
|
||||
created_at="2022-01-01T00:00:00Z",
|
||||
published_at="2022-01-01T00:00:00Z",
|
||||
body="Release notes for 1.0.0",
|
||||
assets=[{"name": "dist.zip", "url": "https://example.com/dist.zip"}],
|
||||
),
|
||||
Release(
|
||||
id=2,
|
||||
tag_name="2.0.0",
|
||||
name="Release 2.0.0",
|
||||
prerelease=False,
|
||||
created_at="2022-02-01T00:00:00Z",
|
||||
published_at="2022-02-01T00:00:00Z",
|
||||
body="Release notes for 2.0.0",
|
||||
assets=[{"name": "dist.zip", "url": "https://example.com/dist.zip"}],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_provider(mock_releases):
|
||||
provider = FrontEndProvider(
|
||||
owner="test-owner",
|
||||
repo="test-repo",
|
||||
)
|
||||
provider.all_releases = mock_releases
|
||||
provider.latest_release = mock_releases[1]
|
||||
FrontendManager.PROVIDERS = [provider]
|
||||
return provider
|
||||
|
||||
|
||||
def test_get_release(mock_provider, mock_releases):
|
||||
version = "1.0.0"
|
||||
release = mock_provider.get_release(version)
|
||||
assert release == mock_releases[0]
|
||||
|
||||
|
||||
def test_get_release_latest(mock_provider, mock_releases):
|
||||
version = "latest"
|
||||
release = mock_provider.get_release(version)
|
||||
assert release == mock_releases[1]
|
||||
|
||||
|
||||
def test_get_release_invalid_version(mock_provider):
|
||||
version = "invalid"
|
||||
with pytest.raises(ValueError):
|
||||
mock_provider.get_release(version)
|
||||
|
||||
|
||||
def test_init_frontend_default():
|
||||
version_string = DEFAULT_VERSION_STRING
|
||||
frontend_path = FrontendManager.init_frontend(version_string)
|
||||
assert frontend_path == FrontendManager.DEFAULT_FRONTEND_PATH
|
||||
|
||||
|
||||
def test_init_frontend_invalid_version():
|
||||
version_string = "test-owner/test-repo@1.100.99"
|
||||
with pytest.raises(HTTPError):
|
||||
FrontendManager.init_frontend_unsafe(version_string)
|
||||
|
||||
|
||||
def test_init_frontend_invalid_provider():
|
||||
version_string = "invalid/invalid@latest"
|
||||
with pytest.raises(HTTPError):
|
||||
FrontendManager.init_frontend_unsafe(version_string)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_os_functions():
|
||||
with patch('app.frontend_management.os.makedirs') as mock_makedirs, \
|
||||
patch('app.frontend_management.os.listdir') as mock_listdir, \
|
||||
patch('app.frontend_management.os.rmdir') as mock_rmdir:
|
||||
mock_listdir.return_value = [] # Simulate empty directory
|
||||
yield mock_makedirs, mock_listdir, mock_rmdir
|
||||
|
||||
@pytest.fixture
|
||||
def mock_download():
|
||||
with patch('app.frontend_management.download_release_asset_zip') as mock:
|
||||
mock.side_effect = Exception("Download failed") # Simulate download failure
|
||||
yield mock
|
||||
|
||||
def test_finally_block(mock_os_functions, mock_download, mock_provider):
|
||||
# Arrange
|
||||
mock_makedirs, mock_listdir, mock_rmdir = mock_os_functions
|
||||
version_string = 'test-owner/test-repo@1.0.0'
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(Exception):
|
||||
FrontendManager.init_frontend_unsafe(version_string, mock_provider)
|
||||
|
||||
# Assert
|
||||
mock_makedirs.assert_called_once()
|
||||
mock_download.assert_called_once()
|
||||
mock_listdir.assert_called_once()
|
||||
mock_rmdir.assert_called_once()
|
||||
|
||||
|
||||
def test_parse_version_string():
|
||||
version_string = "owner/repo@1.0.0"
|
||||
repo_owner, repo_name, version = FrontendManager.parse_version_string(
|
||||
version_string
|
||||
)
|
||||
assert repo_owner == "owner"
|
||||
assert repo_name == "repo"
|
||||
assert version == "1.0.0"
|
||||
|
||||
|
||||
def test_parse_version_string_invalid():
|
||||
version_string = "invalid"
|
||||
with pytest.raises(argparse.ArgumentTypeError):
|
||||
FrontendManager.parse_version_string(version_string)
|
||||
66
tests-unit/comfy_test/folder_path_test.py
Normal file
66
tests-unit/comfy_test/folder_path_test.py
Normal file
@@ -0,0 +1,66 @@
|
||||
### 🗻 This file is created through the spirit of Mount Fuji at its peak
|
||||
# TODO(yoland): clean up this after I get back down
|
||||
import pytest
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch
|
||||
|
||||
import folder_paths
|
||||
|
||||
@pytest.fixture
|
||||
def temp_dir():
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
yield tmpdirname
|
||||
|
||||
|
||||
def test_get_directory_by_type():
|
||||
test_dir = "/test/dir"
|
||||
folder_paths.set_output_directory(test_dir)
|
||||
assert folder_paths.get_directory_by_type("output") == test_dir
|
||||
assert folder_paths.get_directory_by_type("invalid") is None
|
||||
|
||||
def test_annotated_filepath():
|
||||
assert folder_paths.annotated_filepath("test.txt") == ("test.txt", None)
|
||||
assert folder_paths.annotated_filepath("test.txt [output]") == ("test.txt", folder_paths.get_output_directory())
|
||||
assert folder_paths.annotated_filepath("test.txt [input]") == ("test.txt", folder_paths.get_input_directory())
|
||||
assert folder_paths.annotated_filepath("test.txt [temp]") == ("test.txt", folder_paths.get_temp_directory())
|
||||
|
||||
def test_get_annotated_filepath():
|
||||
default_dir = "/default/dir"
|
||||
assert folder_paths.get_annotated_filepath("test.txt", default_dir) == os.path.join(default_dir, "test.txt")
|
||||
assert folder_paths.get_annotated_filepath("test.txt [output]") == os.path.join(folder_paths.get_output_directory(), "test.txt")
|
||||
|
||||
def test_add_model_folder_path():
|
||||
folder_paths.add_model_folder_path("test_folder", "/test/path")
|
||||
assert "/test/path" in folder_paths.get_folder_paths("test_folder")
|
||||
|
||||
def test_recursive_search(temp_dir):
|
||||
os.makedirs(os.path.join(temp_dir, "subdir"))
|
||||
open(os.path.join(temp_dir, "file1.txt"), "w").close()
|
||||
open(os.path.join(temp_dir, "subdir", "file2.txt"), "w").close()
|
||||
|
||||
files, dirs = folder_paths.recursive_search(temp_dir)
|
||||
assert set(files) == {"file1.txt", os.path.join("subdir", "file2.txt")}
|
||||
assert len(dirs) == 2 # temp_dir and subdir
|
||||
|
||||
def test_filter_files_extensions():
|
||||
files = ["file1.txt", "file2.jpg", "file3.png", "file4.txt"]
|
||||
assert folder_paths.filter_files_extensions(files, [".txt"]) == ["file1.txt", "file4.txt"]
|
||||
assert folder_paths.filter_files_extensions(files, [".jpg", ".png"]) == ["file2.jpg", "file3.png"]
|
||||
assert folder_paths.filter_files_extensions(files, []) == files
|
||||
|
||||
@patch("folder_paths.recursive_search")
|
||||
@patch("folder_paths.folder_names_and_paths")
|
||||
def test_get_filename_list(mock_folder_names_and_paths, mock_recursive_search):
|
||||
mock_folder_names_and_paths.__getitem__.return_value = (["/test/path"], {".txt"})
|
||||
mock_recursive_search.return_value = (["file1.txt", "file2.jpg"], {})
|
||||
assert folder_paths.get_filename_list("test_folder") == ["file1.txt"]
|
||||
|
||||
def test_get_save_image_path(temp_dir):
|
||||
with patch("folder_paths.output_directory", temp_dir):
|
||||
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path("test", temp_dir, 100, 100)
|
||||
assert os.path.samefile(full_output_folder, temp_dir)
|
||||
assert filename == "test"
|
||||
assert counter == 1
|
||||
assert subfolder == ""
|
||||
assert filename_prefix == "test"
|
||||
0
tests-unit/folder_paths_test/__init__.py
Normal file
0
tests-unit/folder_paths_test/__init__.py
Normal file
52
tests-unit/folder_paths_test/filter_by_content_types_test.py
Normal file
52
tests-unit/folder_paths_test/filter_by_content_types_test.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import pytest
|
||||
import os
|
||||
import tempfile
|
||||
from folder_paths import filter_files_content_types
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def file_extensions():
|
||||
return {
|
||||
'image': ['gif', 'heif', 'ico', 'jpeg', 'jpg', 'png', 'pnm', 'ppm', 'svg', 'tiff', 'webp', 'xbm', 'xpm'],
|
||||
'audio': ['aif', 'aifc', 'aiff', 'au', 'flac', 'm4a', 'mp2', 'mp3', 'ogg', 'snd', 'wav'],
|
||||
'video': ['avi', 'm2v', 'm4v', 'mkv', 'mov', 'mp4', 'mpeg', 'mpg', 'ogv', 'qt', 'webm', 'wmv']
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def mock_dir(file_extensions):
|
||||
with tempfile.TemporaryDirectory() as directory:
|
||||
for content_type, extensions in file_extensions.items():
|
||||
for extension in extensions:
|
||||
with open(f"{directory}/sample_{content_type}.{extension}", "w") as f:
|
||||
f.write(f"Sample {content_type} file in {extension} format")
|
||||
yield directory
|
||||
|
||||
|
||||
def test_categorizes_all_correctly(mock_dir, file_extensions):
|
||||
files = os.listdir(mock_dir)
|
||||
for content_type, extensions in file_extensions.items():
|
||||
filtered_files = filter_files_content_types(files, [content_type])
|
||||
for extension in extensions:
|
||||
assert f"sample_{content_type}.{extension}" in filtered_files
|
||||
|
||||
|
||||
def test_categorizes_all_uniquely(mock_dir, file_extensions):
|
||||
files = os.listdir(mock_dir)
|
||||
for content_type, extensions in file_extensions.items():
|
||||
filtered_files = filter_files_content_types(files, [content_type])
|
||||
assert len(filtered_files) == len(extensions)
|
||||
|
||||
|
||||
def test_handles_bad_extensions():
|
||||
files = ["file1.txt", "file2.py", "file3.example", "file4.pdf", "file5.ini", "file6.doc", "file7.md"]
|
||||
assert filter_files_content_types(files, ["image", "audio", "video"]) == []
|
||||
|
||||
|
||||
def test_handles_no_extension():
|
||||
files = ["file1", "file2", "file3", "file4", "file5", "file6", "file7"]
|
||||
assert filter_files_content_types(files, ["image", "audio", "video"]) == []
|
||||
|
||||
|
||||
def test_handles_no_files():
|
||||
files = []
|
||||
assert filter_files_content_types(files, ["image", "audio", "video"]) == []
|
||||
0
tests-unit/prompt_server_test/__init__.py
Normal file
0
tests-unit/prompt_server_test/__init__.py
Normal file
337
tests-unit/prompt_server_test/download_models_test.py
Normal file
337
tests-unit/prompt_server_test/download_models_test.py
Normal file
@@ -0,0 +1,337 @@
|
||||
import pytest
|
||||
import tempfile
|
||||
import aiohttp
|
||||
from aiohttp import ClientResponse
|
||||
import itertools
|
||||
import os
|
||||
from unittest.mock import AsyncMock, patch, MagicMock
|
||||
from model_filemanager import download_model, track_download_progress, create_model_path, check_file_exists, DownloadStatusType, DownloadModelStatus, validate_filename
|
||||
import folder_paths
|
||||
|
||||
@pytest.fixture
|
||||
def temp_dir():
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
yield tmpdirname
|
||||
|
||||
class AsyncIteratorMock:
|
||||
"""
|
||||
A mock class that simulates an asynchronous iterator.
|
||||
This is used to mimic the behavior of aiohttp's content iterator.
|
||||
"""
|
||||
def __init__(self, seq):
|
||||
# Convert the input sequence into an iterator
|
||||
self.iter = iter(seq)
|
||||
|
||||
def __aiter__(self):
|
||||
# This method is called when 'async for' is used
|
||||
return self
|
||||
|
||||
async def __anext__(self):
|
||||
# This method is called for each iteration in an 'async for' loop
|
||||
try:
|
||||
return next(self.iter)
|
||||
except StopIteration:
|
||||
# This is the asynchronous equivalent of StopIteration
|
||||
raise StopAsyncIteration
|
||||
|
||||
class ContentMock:
|
||||
"""
|
||||
A mock class that simulates the content attribute of an aiohttp ClientResponse.
|
||||
This class provides the iter_chunked method which returns an async iterator of chunks.
|
||||
"""
|
||||
def __init__(self, chunks):
|
||||
# Store the chunks that will be returned by the iterator
|
||||
self.chunks = chunks
|
||||
|
||||
def iter_chunked(self, chunk_size):
|
||||
# This method mimics aiohttp's content.iter_chunked()
|
||||
# For simplicity in testing, we ignore chunk_size and just return our predefined chunks
|
||||
return AsyncIteratorMock(self.chunks)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_download_model_success(temp_dir):
|
||||
mock_response = AsyncMock(spec=aiohttp.ClientResponse)
|
||||
mock_response.status = 200
|
||||
mock_response.headers = {'Content-Length': '1000'}
|
||||
# Create a mock for content that returns an async iterator directly
|
||||
chunks = [b'a' * 500, b'b' * 300, b'c' * 200]
|
||||
mock_response.content = ContentMock(chunks)
|
||||
|
||||
mock_make_request = AsyncMock(return_value=mock_response)
|
||||
mock_progress_callback = AsyncMock()
|
||||
|
||||
time_values = itertools.count(0, 0.1)
|
||||
|
||||
fake_paths = {'checkpoints': ([temp_dir], folder_paths.supported_pt_extensions)}
|
||||
|
||||
with patch('model_filemanager.create_model_path', return_value=('models/checkpoints/model.sft', 'model.sft')), \
|
||||
patch('model_filemanager.check_file_exists', return_value=None), \
|
||||
patch('folder_paths.folder_names_and_paths', fake_paths), \
|
||||
patch('time.time', side_effect=time_values): # Simulate time passing
|
||||
|
||||
result = await download_model(
|
||||
mock_make_request,
|
||||
'model.sft',
|
||||
'http://example.com/model.sft',
|
||||
'checkpoints',
|
||||
temp_dir,
|
||||
mock_progress_callback
|
||||
)
|
||||
|
||||
# Assert the result
|
||||
assert isinstance(result, DownloadModelStatus)
|
||||
assert result.message == 'Successfully downloaded model.sft'
|
||||
assert result.status == 'completed'
|
||||
assert result.already_existed is False
|
||||
|
||||
# Check progress callback calls
|
||||
assert mock_progress_callback.call_count >= 3 # At least start, one progress update, and completion
|
||||
|
||||
# Check initial call
|
||||
mock_progress_callback.assert_any_call(
|
||||
'model.sft',
|
||||
DownloadModelStatus(DownloadStatusType.PENDING, 0, "Starting download of model.sft", False)
|
||||
)
|
||||
|
||||
# Check final call
|
||||
mock_progress_callback.assert_any_call(
|
||||
'model.sft',
|
||||
DownloadModelStatus(DownloadStatusType.COMPLETED, 100, "Successfully downloaded model.sft", False)
|
||||
)
|
||||
|
||||
mock_file_path = os.path.join(temp_dir, 'model.sft')
|
||||
assert os.path.exists(mock_file_path)
|
||||
with open(mock_file_path, 'rb') as mock_file:
|
||||
assert mock_file.read() == b''.join(chunks)
|
||||
os.remove(mock_file_path)
|
||||
|
||||
# Verify request was made
|
||||
mock_make_request.assert_called_once_with('http://example.com/model.sft')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_download_model_url_request_failure(temp_dir):
|
||||
# Mock dependencies
|
||||
mock_response = AsyncMock(spec=ClientResponse)
|
||||
mock_response.status = 404 # Simulate a "Not Found" error
|
||||
mock_get = AsyncMock(return_value=mock_response)
|
||||
mock_progress_callback = AsyncMock()
|
||||
|
||||
fake_paths = {'checkpoints': ([temp_dir], folder_paths.supported_pt_extensions)}
|
||||
|
||||
# Mock the create_model_path function
|
||||
with patch('model_filemanager.create_model_path', return_value='/mock/path/model.safetensors'), \
|
||||
patch('model_filemanager.check_file_exists', return_value=None), \
|
||||
patch('folder_paths.folder_names_and_paths', fake_paths):
|
||||
# Call the function
|
||||
result = await download_model(
|
||||
mock_get,
|
||||
'model.safetensors',
|
||||
'http://example.com/model.safetensors',
|
||||
'checkpoints',
|
||||
temp_dir,
|
||||
mock_progress_callback
|
||||
)
|
||||
|
||||
# Assert the expected behavior
|
||||
assert isinstance(result, DownloadModelStatus)
|
||||
assert result.status == 'error'
|
||||
assert result.message == 'Failed to download model.safetensors. Status code: 404'
|
||||
assert result.already_existed is False
|
||||
|
||||
# Check that progress_callback was called with the correct arguments
|
||||
mock_progress_callback.assert_any_call(
|
||||
'model.safetensors',
|
||||
DownloadModelStatus(
|
||||
status=DownloadStatusType.PENDING,
|
||||
progress_percentage=0,
|
||||
message='Starting download of model.safetensors',
|
||||
already_existed=False
|
||||
)
|
||||
)
|
||||
mock_progress_callback.assert_called_with(
|
||||
'model.safetensors',
|
||||
DownloadModelStatus(
|
||||
status=DownloadStatusType.ERROR,
|
||||
progress_percentage=0,
|
||||
message='Failed to download model.safetensors. Status code: 404',
|
||||
already_existed=False
|
||||
)
|
||||
)
|
||||
|
||||
# Verify that the get method was called with the correct URL
|
||||
mock_get.assert_called_once_with('http://example.com/model.safetensors')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_download_model_invalid_model_subdirectory():
|
||||
mock_make_request = AsyncMock()
|
||||
mock_progress_callback = AsyncMock()
|
||||
|
||||
result = await download_model(
|
||||
mock_make_request,
|
||||
'model.sft',
|
||||
'http://example.com/model.sft',
|
||||
'../bad_path',
|
||||
'../bad_path',
|
||||
mock_progress_callback
|
||||
)
|
||||
|
||||
# Assert the result
|
||||
assert isinstance(result, DownloadModelStatus)
|
||||
assert result.message.startswith('Invalid or unrecognized model directory')
|
||||
assert result.status == 'error'
|
||||
assert result.already_existed is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_download_model_invalid_folder_path():
|
||||
mock_make_request = AsyncMock()
|
||||
mock_progress_callback = AsyncMock()
|
||||
|
||||
result = await download_model(
|
||||
mock_make_request,
|
||||
'model.sft',
|
||||
'http://example.com/model.sft',
|
||||
'checkpoints',
|
||||
'invalid_path',
|
||||
mock_progress_callback
|
||||
)
|
||||
|
||||
# Assert the result
|
||||
assert isinstance(result, DownloadModelStatus)
|
||||
assert result.message.startswith("Invalid folder path")
|
||||
assert result.status == 'error'
|
||||
assert result.already_existed is False
|
||||
|
||||
def test_create_model_path(tmp_path, monkeypatch):
|
||||
model_name = "model.safetensors"
|
||||
folder_path = os.path.join(tmp_path, "mock_dir")
|
||||
|
||||
file_path = create_model_path(model_name, folder_path)
|
||||
|
||||
assert file_path == os.path.join(folder_path, "model.safetensors")
|
||||
assert os.path.exists(os.path.dirname(file_path))
|
||||
|
||||
with pytest.raises(Exception, match="Invalid model directory"):
|
||||
create_model_path("../path_traversal.safetensors", folder_path)
|
||||
|
||||
with pytest.raises(Exception, match="Invalid model directory"):
|
||||
create_model_path("/etc/some_root_path", folder_path)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_file_exists_when_file_exists(tmp_path):
|
||||
file_path = tmp_path / "existing_model.sft"
|
||||
file_path.touch() # Create an empty file
|
||||
|
||||
mock_callback = AsyncMock()
|
||||
|
||||
result = await check_file_exists(str(file_path), "existing_model.sft", mock_callback)
|
||||
|
||||
assert result is not None
|
||||
assert result.status == "completed"
|
||||
assert result.message == "existing_model.sft already exists"
|
||||
assert result.already_existed is True
|
||||
|
||||
mock_callback.assert_called_once_with(
|
||||
"existing_model.sft",
|
||||
DownloadModelStatus(DownloadStatusType.COMPLETED, 100, "existing_model.sft already exists", already_existed=True)
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_file_exists_when_file_does_not_exist(tmp_path):
|
||||
file_path = tmp_path / "non_existing_model.sft"
|
||||
|
||||
mock_callback = AsyncMock()
|
||||
|
||||
result = await check_file_exists(str(file_path), "non_existing_model.sft", mock_callback)
|
||||
|
||||
assert result is None
|
||||
mock_callback.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_track_download_progress_no_content_length(temp_dir):
|
||||
mock_response = AsyncMock(spec=aiohttp.ClientResponse)
|
||||
mock_response.headers = {} # No Content-Length header
|
||||
chunks = [b'a' * 500, b'b' * 500]
|
||||
mock_response.content.iter_chunked.return_value = AsyncIteratorMock(chunks)
|
||||
|
||||
mock_callback = AsyncMock()
|
||||
|
||||
full_path = os.path.join(temp_dir, 'model.sft')
|
||||
|
||||
result = await track_download_progress(
|
||||
mock_response, full_path, 'model.sft',
|
||||
mock_callback, interval=0.1
|
||||
)
|
||||
|
||||
assert result.status == "completed"
|
||||
|
||||
assert os.path.exists(full_path)
|
||||
with open(full_path, 'rb') as f:
|
||||
assert f.read() == b''.join(chunks)
|
||||
os.remove(full_path)
|
||||
|
||||
# Check that progress was reported even without knowing the total size
|
||||
mock_callback.assert_any_call(
|
||||
'model.sft',
|
||||
DownloadModelStatus(DownloadStatusType.IN_PROGRESS, 0, "Downloading model.sft", already_existed=False)
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_track_download_progress_interval(temp_dir):
|
||||
mock_response = AsyncMock(spec=aiohttp.ClientResponse)
|
||||
mock_response.headers = {'Content-Length': '1000'}
|
||||
chunks = [b'a' * 100] * 10
|
||||
mock_response.content.iter_chunked.return_value = AsyncIteratorMock(chunks)
|
||||
|
||||
mock_callback = AsyncMock()
|
||||
mock_open = MagicMock(return_value=MagicMock())
|
||||
|
||||
# Create a mock time function that returns incremental float values
|
||||
mock_time = MagicMock()
|
||||
mock_time.side_effect = [i * 0.5 for i in range(30)] # This should be enough for 10 chunks
|
||||
|
||||
full_path = os.path.join(temp_dir, 'model.sft')
|
||||
|
||||
with patch('time.time', mock_time):
|
||||
await track_download_progress(
|
||||
mock_response, full_path, 'model.sft',
|
||||
mock_callback, interval=1.0
|
||||
)
|
||||
|
||||
assert os.path.exists(full_path)
|
||||
with open(full_path, 'rb') as f:
|
||||
assert f.read() == b''.join(chunks)
|
||||
os.remove(full_path)
|
||||
|
||||
# Assert that progress was updated at least 3 times (start, at least one interval, and end)
|
||||
assert mock_callback.call_count >= 3, f"Expected at least 3 calls, but got {mock_callback.call_count}"
|
||||
|
||||
# Verify the first and last calls
|
||||
first_call = mock_callback.call_args_list[0]
|
||||
assert first_call[0][1].status == "in_progress"
|
||||
# Allow for some initial progress, but it should be less than 50%
|
||||
assert 0 <= first_call[0][1].progress_percentage < 50, f"First call progress was {first_call[0][1].progress_percentage}%"
|
||||
|
||||
last_call = mock_callback.call_args_list[-1]
|
||||
assert last_call[0][1].status == "completed"
|
||||
assert last_call[0][1].progress_percentage == 100
|
||||
|
||||
@pytest.mark.parametrize("filename, expected", [
|
||||
("valid_model.safetensors", True),
|
||||
("valid_model.sft", True),
|
||||
("valid model.safetensors", True), # Test with space
|
||||
("UPPERCASE_MODEL.SAFETENSORS", True),
|
||||
("model_with.multiple.dots.pt", False),
|
||||
("", False), # Empty string
|
||||
("../../../etc/passwd", False), # Path traversal attempt
|
||||
("/etc/passwd", False), # Absolute path
|
||||
("\\windows\\system32\\config\\sam", False), # Windows path
|
||||
(".hidden_file.pt", False), # Hidden file
|
||||
("invalid<char>.ckpt", False), # Invalid character
|
||||
("invalid?.ckpt", False), # Another invalid character
|
||||
("very" * 100 + ".safetensors", False), # Too long filename
|
||||
("\nmodel_with_newline.pt", False), # Newline character
|
||||
("model_with_emoji😊.pt", False), # Emoji in filename
|
||||
])
|
||||
def test_validate_filename(filename, expected):
|
||||
assert validate_filename(filename) == expected
|
||||
231
tests-unit/prompt_server_test/user_manager_test.py
Normal file
231
tests-unit/prompt_server_test/user_manager_test.py
Normal file
@@ -0,0 +1,231 @@
|
||||
import pytest
|
||||
import os
|
||||
from aiohttp import web
|
||||
from app.user_manager import UserManager
|
||||
from unittest.mock import patch
|
||||
|
||||
pytestmark = (
|
||||
pytest.mark.asyncio
|
||||
) # This applies the asyncio mark to all test functions in the module
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user_manager(tmp_path):
|
||||
um = UserManager()
|
||||
um.get_request_user_filepath = lambda req, file, **kwargs: os.path.join(
|
||||
tmp_path, file
|
||||
) if file else tmp_path
|
||||
return um
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(user_manager):
|
||||
app = web.Application()
|
||||
routes = web.RouteTableDef()
|
||||
user_manager.add_routes(routes)
|
||||
app.add_routes(routes)
|
||||
return app
|
||||
|
||||
|
||||
async def test_listuserdata_empty_directory(aiohttp_client, app, tmp_path):
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata?dir=test_dir")
|
||||
assert resp.status == 404
|
||||
|
||||
|
||||
async def test_listuserdata_with_files(aiohttp_client, app, tmp_path):
|
||||
os.makedirs(tmp_path / "test_dir")
|
||||
with open(tmp_path / "test_dir" / "file1.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata?dir=test_dir")
|
||||
assert resp.status == 200
|
||||
assert await resp.json() == ["file1.txt"]
|
||||
|
||||
|
||||
async def test_listuserdata_recursive(aiohttp_client, app, tmp_path):
|
||||
os.makedirs(tmp_path / "test_dir" / "subdir")
|
||||
with open(tmp_path / "test_dir" / "file1.txt", "w") as f:
|
||||
f.write("test content")
|
||||
with open(tmp_path / "test_dir" / "subdir" / "file2.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata?dir=test_dir&recurse=true")
|
||||
assert resp.status == 200
|
||||
assert set(await resp.json()) == {"file1.txt", "subdir/file2.txt"}
|
||||
|
||||
|
||||
async def test_listuserdata_full_info(aiohttp_client, app, tmp_path):
|
||||
os.makedirs(tmp_path / "test_dir")
|
||||
with open(tmp_path / "test_dir" / "file1.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata?dir=test_dir&full_info=true")
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
assert len(result) == 1
|
||||
assert result[0]["path"] == "file1.txt"
|
||||
assert "size" in result[0]
|
||||
assert "modified" in result[0]
|
||||
|
||||
|
||||
async def test_listuserdata_split_path(aiohttp_client, app, tmp_path):
|
||||
os.makedirs(tmp_path / "test_dir" / "subdir")
|
||||
with open(tmp_path / "test_dir" / "subdir" / "file1.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata?dir=test_dir&recurse=true&split=true")
|
||||
assert resp.status == 200
|
||||
assert await resp.json() == [["subdir/file1.txt", "subdir", "file1.txt"]]
|
||||
|
||||
|
||||
async def test_listuserdata_invalid_directory(aiohttp_client, app):
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata?dir=")
|
||||
assert resp.status == 400
|
||||
|
||||
|
||||
async def test_listuserdata_normalized_separator(aiohttp_client, app, tmp_path):
|
||||
os_sep = "\\"
|
||||
with patch("os.sep", os_sep):
|
||||
with patch("os.path.sep", os_sep):
|
||||
os.makedirs(tmp_path / "test_dir" / "subdir")
|
||||
with open(tmp_path / "test_dir" / "subdir" / "file1.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata?dir=test_dir&recurse=true")
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
assert len(result) == 1
|
||||
assert "/" in result[0] # Ensure forward slash is used
|
||||
assert "\\" not in result[0] # Ensure backslash is not present
|
||||
assert result[0] == "subdir/file1.txt"
|
||||
|
||||
# Test with full_info
|
||||
resp = await client.get(
|
||||
"/userdata?dir=test_dir&recurse=true&full_info=true"
|
||||
)
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
assert len(result) == 1
|
||||
assert "/" in result[0]["path"] # Ensure forward slash is used
|
||||
assert "\\" not in result[0]["path"] # Ensure backslash is not present
|
||||
assert result[0]["path"] == "subdir/file1.txt"
|
||||
|
||||
|
||||
async def test_post_userdata_new_file(aiohttp_client, app, tmp_path):
|
||||
client = await aiohttp_client(app)
|
||||
content = b"test content"
|
||||
resp = await client.post("/userdata/test.txt", data=content)
|
||||
|
||||
assert resp.status == 200
|
||||
assert await resp.text() == '"test.txt"'
|
||||
|
||||
# Verify file was created with correct content
|
||||
with open(tmp_path / "test.txt", "rb") as f:
|
||||
assert f.read() == content
|
||||
|
||||
|
||||
async def test_post_userdata_overwrite_existing(aiohttp_client, app, tmp_path):
|
||||
# Create initial file
|
||||
with open(tmp_path / "test.txt", "w") as f:
|
||||
f.write("initial content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
new_content = b"updated content"
|
||||
resp = await client.post("/userdata/test.txt", data=new_content)
|
||||
|
||||
assert resp.status == 200
|
||||
assert await resp.text() == '"test.txt"'
|
||||
|
||||
# Verify file was overwritten
|
||||
with open(tmp_path / "test.txt", "rb") as f:
|
||||
assert f.read() == new_content
|
||||
|
||||
|
||||
async def test_post_userdata_no_overwrite(aiohttp_client, app, tmp_path):
|
||||
# Create initial file
|
||||
with open(tmp_path / "test.txt", "w") as f:
|
||||
f.write("initial content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.post("/userdata/test.txt?overwrite=false", data=b"new content")
|
||||
|
||||
assert resp.status == 409
|
||||
|
||||
# Verify original content unchanged
|
||||
with open(tmp_path / "test.txt", "r") as f:
|
||||
assert f.read() == "initial content"
|
||||
|
||||
|
||||
async def test_post_userdata_full_info(aiohttp_client, app, tmp_path):
|
||||
client = await aiohttp_client(app)
|
||||
content = b"test content"
|
||||
resp = await client.post("/userdata/test.txt?full_info=true", data=content)
|
||||
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
assert result["path"] == "test.txt"
|
||||
assert result["size"] == len(content)
|
||||
assert "modified" in result
|
||||
|
||||
|
||||
async def test_move_userdata(aiohttp_client, app, tmp_path):
|
||||
# Create initial file
|
||||
with open(tmp_path / "source.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.post("/userdata/source.txt/move/dest.txt")
|
||||
|
||||
assert resp.status == 200
|
||||
assert await resp.text() == '"dest.txt"'
|
||||
|
||||
# Verify file was moved
|
||||
assert not os.path.exists(tmp_path / "source.txt")
|
||||
with open(tmp_path / "dest.txt", "r") as f:
|
||||
assert f.read() == "test content"
|
||||
|
||||
|
||||
async def test_move_userdata_no_overwrite(aiohttp_client, app, tmp_path):
|
||||
# Create source and destination files
|
||||
with open(tmp_path / "source.txt", "w") as f:
|
||||
f.write("source content")
|
||||
with open(tmp_path / "dest.txt", "w") as f:
|
||||
f.write("destination content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.post("/userdata/source.txt/move/dest.txt?overwrite=false")
|
||||
|
||||
assert resp.status == 409
|
||||
|
||||
# Verify files remain unchanged
|
||||
with open(tmp_path / "source.txt", "r") as f:
|
||||
assert f.read() == "source content"
|
||||
with open(tmp_path / "dest.txt", "r") as f:
|
||||
assert f.read() == "destination content"
|
||||
|
||||
|
||||
async def test_move_userdata_full_info(aiohttp_client, app, tmp_path):
|
||||
# Create initial file
|
||||
with open(tmp_path / "source.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.post("/userdata/source.txt/move/dest.txt?full_info=true")
|
||||
|
||||
assert resp.status == 200
|
||||
result = await resp.json()
|
||||
assert result["path"] == "dest.txt"
|
||||
assert result["size"] == len("test content")
|
||||
assert "modified" in result
|
||||
|
||||
# Verify file was moved
|
||||
assert not os.path.exists(tmp_path / "source.txt")
|
||||
with open(tmp_path / "dest.txt", "r") as f:
|
||||
assert f.read() == "test content"
|
||||
3
tests-unit/requirements.txt
Normal file
3
tests-unit/requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
pytest>=7.8.0
|
||||
pytest-aiohttp
|
||||
pytest-asyncio
|
||||
115
tests-unit/server/routes/internal_routes_test.py
Normal file
115
tests-unit/server/routes/internal_routes_test.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import pytest
|
||||
from aiohttp import web
|
||||
from unittest.mock import MagicMock, patch
|
||||
from api_server.routes.internal.internal_routes import InternalRoutes
|
||||
from api_server.services.file_service import FileService
|
||||
from folder_paths import models_dir, user_directory, output_directory
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def internal_routes():
|
||||
return InternalRoutes(None)
|
||||
|
||||
@pytest.fixture
|
||||
def aiohttp_client_factory(aiohttp_client, internal_routes):
|
||||
async def _get_client():
|
||||
app = internal_routes.get_app()
|
||||
return await aiohttp_client(app)
|
||||
return _get_client
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_files_valid_directory(aiohttp_client_factory, internal_routes):
|
||||
mock_file_list = [
|
||||
{"name": "file1.txt", "path": "file1.txt", "type": "file", "size": 100},
|
||||
{"name": "dir1", "path": "dir1", "type": "directory"}
|
||||
]
|
||||
internal_routes.file_service.list_files = MagicMock(return_value=mock_file_list)
|
||||
client = await aiohttp_client_factory()
|
||||
resp = await client.get('/files?directory=models')
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
assert 'files' in data
|
||||
assert len(data['files']) == 2
|
||||
assert data['files'] == mock_file_list
|
||||
|
||||
# Check other valid directories
|
||||
resp = await client.get('/files?directory=user')
|
||||
assert resp.status == 200
|
||||
resp = await client.get('/files?directory=output')
|
||||
assert resp.status == 200
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_files_invalid_directory(aiohttp_client_factory, internal_routes):
|
||||
internal_routes.file_service.list_files = MagicMock(side_effect=ValueError("Invalid directory key"))
|
||||
client = await aiohttp_client_factory()
|
||||
resp = await client.get('/files?directory=invalid')
|
||||
assert resp.status == 400
|
||||
data = await resp.json()
|
||||
assert 'error' in data
|
||||
assert data['error'] == "Invalid directory key"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_files_exception(aiohttp_client_factory, internal_routes):
|
||||
internal_routes.file_service.list_files = MagicMock(side_effect=Exception("Unexpected error"))
|
||||
client = await aiohttp_client_factory()
|
||||
resp = await client.get('/files?directory=models')
|
||||
assert resp.status == 500
|
||||
data = await resp.json()
|
||||
assert 'error' in data
|
||||
assert data['error'] == "Unexpected error"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_files_no_directory_param(aiohttp_client_factory, internal_routes):
|
||||
mock_file_list = []
|
||||
internal_routes.file_service.list_files = MagicMock(return_value=mock_file_list)
|
||||
client = await aiohttp_client_factory()
|
||||
resp = await client.get('/files')
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
assert 'files' in data
|
||||
assert len(data['files']) == 0
|
||||
|
||||
def test_setup_routes(internal_routes):
|
||||
internal_routes.setup_routes()
|
||||
routes = internal_routes.routes
|
||||
assert any(route.method == 'GET' and str(route.path) == '/files' for route in routes)
|
||||
|
||||
def test_get_app(internal_routes):
|
||||
app = internal_routes.get_app()
|
||||
assert isinstance(app, web.Application)
|
||||
assert internal_routes._app is not None
|
||||
|
||||
def test_get_app_reuse(internal_routes):
|
||||
app1 = internal_routes.get_app()
|
||||
app2 = internal_routes.get_app()
|
||||
assert app1 is app2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_routes_added_to_app(aiohttp_client_factory, internal_routes):
|
||||
client = await aiohttp_client_factory()
|
||||
try:
|
||||
resp = await client.get('/files')
|
||||
print(f"Response received: status {resp.status}")
|
||||
except Exception as e:
|
||||
print(f"Exception occurred during GET request: {e}")
|
||||
raise
|
||||
|
||||
assert resp.status != 404, "Route /files does not exist"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_service_initialization():
|
||||
with patch('api_server.routes.internal.internal_routes.FileService') as MockFileService:
|
||||
# Create a mock instance
|
||||
mock_file_service_instance = MagicMock(spec=FileService)
|
||||
MockFileService.return_value = mock_file_service_instance
|
||||
internal_routes = InternalRoutes(None)
|
||||
|
||||
# Check if FileService was initialized with the correct parameters
|
||||
MockFileService.assert_called_once_with({
|
||||
"models": models_dir,
|
||||
"user": user_directory,
|
||||
"output": output_directory
|
||||
})
|
||||
|
||||
# Verify that the file_service attribute of InternalRoutes is set
|
||||
assert internal_routes.file_service == mock_file_service_instance
|
||||
54
tests-unit/server/services/file_service_test.py
Normal file
54
tests-unit/server/services/file_service_test.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from api_server.services.file_service import FileService
|
||||
|
||||
@pytest.fixture
|
||||
def mock_file_system_ops():
|
||||
return MagicMock()
|
||||
|
||||
@pytest.fixture
|
||||
def file_service(mock_file_system_ops):
|
||||
allowed_directories = {
|
||||
"models": "/path/to/models",
|
||||
"user": "/path/to/user",
|
||||
"output": "/path/to/output"
|
||||
}
|
||||
return FileService(allowed_directories, file_system_ops=mock_file_system_ops)
|
||||
|
||||
def test_list_files_valid_directory(file_service, mock_file_system_ops):
|
||||
mock_file_system_ops.walk_directory.return_value = [
|
||||
{"name": "file1.txt", "path": "file1.txt", "type": "file", "size": 100},
|
||||
{"name": "dir1", "path": "dir1", "type": "directory"}
|
||||
]
|
||||
|
||||
result = file_service.list_files("models")
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0]["name"] == "file1.txt"
|
||||
assert result[1]["name"] == "dir1"
|
||||
mock_file_system_ops.walk_directory.assert_called_once_with("/path/to/models")
|
||||
|
||||
def test_list_files_invalid_directory(file_service):
|
||||
# Does not support walking directories outside of the allowed directories
|
||||
with pytest.raises(ValueError, match="Invalid directory key"):
|
||||
file_service.list_files("invalid_key")
|
||||
|
||||
def test_list_files_empty_directory(file_service, mock_file_system_ops):
|
||||
mock_file_system_ops.walk_directory.return_value = []
|
||||
|
||||
result = file_service.list_files("models")
|
||||
|
||||
assert len(result) == 0
|
||||
mock_file_system_ops.walk_directory.assert_called_once_with("/path/to/models")
|
||||
|
||||
@pytest.mark.parametrize("directory_key", ["models", "user", "output"])
|
||||
def test_list_files_all_allowed_directories(file_service, mock_file_system_ops, directory_key):
|
||||
mock_file_system_ops.walk_directory.return_value = [
|
||||
{"name": f"file_{directory_key}.txt", "path": f"file_{directory_key}.txt", "type": "file", "size": 100}
|
||||
]
|
||||
|
||||
result = file_service.list_files(directory_key)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == f"file_{directory_key}.txt"
|
||||
mock_file_system_ops.walk_directory.assert_called_once_with(f"/path/to/{directory_key}")
|
||||
42
tests-unit/server/utils/file_operations_test.py
Normal file
42
tests-unit/server/utils/file_operations_test.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import pytest
|
||||
from typing import List
|
||||
from api_server.utils.file_operations import FileSystemOperations, FileSystemItem, is_file_info
|
||||
|
||||
@pytest.fixture
|
||||
def temp_directory(tmp_path):
|
||||
# Create a temporary directory structure
|
||||
dir1 = tmp_path / "dir1"
|
||||
dir2 = tmp_path / "dir2"
|
||||
dir1.mkdir()
|
||||
dir2.mkdir()
|
||||
(dir1 / "file1.txt").write_text("content1")
|
||||
(dir2 / "file2.txt").write_text("content2")
|
||||
(tmp_path / "file3.txt").write_text("content3")
|
||||
return tmp_path
|
||||
|
||||
def test_walk_directory(temp_directory):
|
||||
result: List[FileSystemItem] = FileSystemOperations.walk_directory(str(temp_directory))
|
||||
|
||||
assert len(result) == 5 # 2 directories and 3 files
|
||||
|
||||
files = [item for item in result if item['type'] == 'file']
|
||||
dirs = [item for item in result if item['type'] == 'directory']
|
||||
|
||||
assert len(files) == 3
|
||||
assert len(dirs) == 2
|
||||
|
||||
file_names = {file['name'] for file in files}
|
||||
assert file_names == {'file1.txt', 'file2.txt', 'file3.txt'}
|
||||
|
||||
dir_names = {dir['name'] for dir in dirs}
|
||||
assert dir_names == {'dir1', 'dir2'}
|
||||
|
||||
def test_walk_directory_empty(tmp_path):
|
||||
result = FileSystemOperations.walk_directory(str(tmp_path))
|
||||
assert len(result) == 0
|
||||
|
||||
def test_walk_directory_file_size(temp_directory):
|
||||
result: List[FileSystemItem] = FileSystemOperations.walk_directory(str(temp_directory))
|
||||
files = [item for item in result if is_file_info(item)]
|
||||
for file in files:
|
||||
assert file['size'] > 0 # Assuming all files have some content
|
||||
126
tests-unit/utils/extra_config_test.py
Normal file
126
tests-unit/utils/extra_config_test.py
Normal file
@@ -0,0 +1,126 @@
|
||||
import pytest
|
||||
import yaml
|
||||
import os
|
||||
from unittest.mock import Mock, patch, mock_open
|
||||
|
||||
from utils.extra_config import load_extra_path_config
|
||||
import folder_paths
|
||||
|
||||
@pytest.fixture
|
||||
def mock_yaml_content():
|
||||
return {
|
||||
'test_config': {
|
||||
'base_path': '~/App/',
|
||||
'checkpoints': 'subfolder1',
|
||||
}
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_expanded_home():
|
||||
return '/home/user'
|
||||
|
||||
@pytest.fixture
|
||||
def yaml_config_with_appdata():
|
||||
return """
|
||||
test_config:
|
||||
base_path: '%APPDATA%/ComfyUI'
|
||||
checkpoints: 'models/checkpoints'
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_yaml_content_appdata(yaml_config_with_appdata):
|
||||
return yaml.safe_load(yaml_config_with_appdata)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_expandvars_appdata():
|
||||
mock = Mock()
|
||||
mock.side_effect = lambda path: path.replace('%APPDATA%', 'C:/Users/TestUser/AppData/Roaming')
|
||||
return mock
|
||||
|
||||
@pytest.fixture
|
||||
def mock_add_model_folder_path():
|
||||
return Mock()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_expanduser(mock_expanded_home):
|
||||
def _expanduser(path):
|
||||
if path.startswith('~/'):
|
||||
return os.path.join(mock_expanded_home, path[2:])
|
||||
return path
|
||||
return _expanduser
|
||||
|
||||
@pytest.fixture
|
||||
def mock_yaml_safe_load(mock_yaml_content):
|
||||
return Mock(return_value=mock_yaml_content)
|
||||
|
||||
@patch('builtins.open', new_callable=mock_open, read_data="dummy file content")
|
||||
def test_load_extra_model_paths_expands_userpath(
|
||||
mock_file,
|
||||
monkeypatch,
|
||||
mock_add_model_folder_path,
|
||||
mock_expanduser,
|
||||
mock_yaml_safe_load,
|
||||
mock_expanded_home
|
||||
):
|
||||
# Attach mocks used by load_extra_path_config
|
||||
monkeypatch.setattr(folder_paths, 'add_model_folder_path', mock_add_model_folder_path)
|
||||
monkeypatch.setattr(os.path, 'expanduser', mock_expanduser)
|
||||
monkeypatch.setattr(yaml, 'safe_load', mock_yaml_safe_load)
|
||||
|
||||
dummy_yaml_file_name = 'dummy_path.yaml'
|
||||
load_extra_path_config(dummy_yaml_file_name)
|
||||
|
||||
expected_calls = [
|
||||
('checkpoints', os.path.join(mock_expanded_home, 'App', 'subfolder1'), False),
|
||||
]
|
||||
|
||||
assert mock_add_model_folder_path.call_count == len(expected_calls)
|
||||
|
||||
# Check if add_model_folder_path was called with the correct arguments
|
||||
for actual_call, expected_call in zip(mock_add_model_folder_path.call_args_list, expected_calls):
|
||||
assert actual_call.args[0] == expected_call[0]
|
||||
assert os.path.normpath(actual_call.args[1]) == os.path.normpath(expected_call[1]) # Normalize and check the path to check on multiple OS.
|
||||
assert actual_call.args[2] == expected_call[2]
|
||||
|
||||
# Check if yaml.safe_load was called
|
||||
mock_yaml_safe_load.assert_called_once()
|
||||
|
||||
# Check if open was called with the correct file path
|
||||
mock_file.assert_called_once_with(dummy_yaml_file_name, 'r')
|
||||
|
||||
@patch('builtins.open', new_callable=mock_open)
|
||||
def test_load_extra_model_paths_expands_appdata(
|
||||
mock_file,
|
||||
monkeypatch,
|
||||
mock_add_model_folder_path,
|
||||
mock_expandvars_appdata,
|
||||
yaml_config_with_appdata,
|
||||
mock_yaml_content_appdata
|
||||
):
|
||||
# Set the mock_file to return yaml with appdata as a variable
|
||||
mock_file.return_value.read.return_value = yaml_config_with_appdata
|
||||
|
||||
# Attach mocks
|
||||
monkeypatch.setattr(folder_paths, 'add_model_folder_path', mock_add_model_folder_path)
|
||||
monkeypatch.setattr(os.path, 'expandvars', mock_expandvars_appdata)
|
||||
monkeypatch.setattr(yaml, 'safe_load', Mock(return_value=mock_yaml_content_appdata))
|
||||
|
||||
# Mock expanduser to do nothing (since we're not testing it here)
|
||||
monkeypatch.setattr(os.path, 'expanduser', lambda x: x)
|
||||
|
||||
dummy_yaml_file_name = 'dummy_path.yaml'
|
||||
load_extra_path_config(dummy_yaml_file_name)
|
||||
|
||||
expected_base_path = 'C:/Users/TestUser/AppData/Roaming/ComfyUI'
|
||||
expected_calls = [
|
||||
('checkpoints', os.path.join(expected_base_path, 'models/checkpoints'), False),
|
||||
]
|
||||
|
||||
assert mock_add_model_folder_path.call_count == len(expected_calls)
|
||||
|
||||
# Check the base path variable was expanded
|
||||
for actual_call, expected_call in zip(mock_add_model_folder_path.call_args_list, expected_calls):
|
||||
assert actual_call.args == expected_call
|
||||
|
||||
# Verify that expandvars was called
|
||||
assert mock_expandvars_appdata.called
|
||||
Reference in New Issue
Block a user