initial commit

This commit is contained in:
2022-12-20 21:26:47 +01:00
commit 2962a6db69
722 changed files with 63886 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Initialize HACS utils."""

View File

@@ -0,0 +1,137 @@
"""Backup."""
from __future__ import annotations
import os
import shutil
import tempfile
from time import sleep
from typing import TYPE_CHECKING
from .path import is_safe
if TYPE_CHECKING:
from ..base import HacsBase
from ..repositories.base import HacsRepository
DEFAULT_BACKUP_PATH = f"{tempfile.gettempdir()}/hacs_backup/"
class Backup:
"""Backup."""
def __init__(
self,
hacs: HacsBase,
local_path: str | None = None,
backup_path: str = DEFAULT_BACKUP_PATH,
repository: HacsRepository | None = None,
) -> None:
"""initialize."""
self.hacs = hacs
self.repository = repository
self.local_path = local_path or repository.content.path.local
self.backup_path = backup_path
if repository:
self.backup_path = (
tempfile.gettempdir()
+ f"/hacs_persistent_{repository.data.category}/"
+ repository.data.name
)
self.backup_path_full = f"{self.backup_path}{self.local_path.split('/')[-1]}"
def _init_backup_dir(self) -> bool:
"""Init backup dir."""
if not os.path.exists(self.local_path):
return False
if not is_safe(self.hacs, self.local_path):
return False
if os.path.exists(self.backup_path):
shutil.rmtree(self.backup_path)
# Wait for the folder to be removed
while os.path.exists(self.backup_path):
sleep(0.1)
os.makedirs(self.backup_path, exist_ok=True)
return True
def create(self) -> None:
"""Create a backup in /tmp"""
if not self._init_backup_dir():
return
try:
if os.path.isfile(self.local_path):
shutil.copyfile(self.local_path, self.backup_path_full)
os.remove(self.local_path)
else:
shutil.copytree(self.local_path, self.backup_path_full)
shutil.rmtree(self.local_path)
while os.path.exists(self.local_path):
sleep(0.1)
self.hacs.log.debug(
"Backup for %s, created in %s",
self.local_path,
self.backup_path_full,
)
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
self.hacs.log.warning("Could not create backup: %s", exception)
def restore(self) -> None:
"""Restore from backup."""
if not os.path.exists(self.backup_path_full):
return
if os.path.isfile(self.backup_path_full):
if os.path.exists(self.local_path):
os.remove(self.local_path)
shutil.copyfile(self.backup_path_full, self.local_path)
else:
if os.path.exists(self.local_path):
shutil.rmtree(self.local_path)
while os.path.exists(self.local_path):
sleep(0.1)
shutil.copytree(self.backup_path_full, self.local_path)
self.hacs.log.debug("Restored %s, from backup %s", self.local_path, self.backup_path_full)
def cleanup(self) -> None:
"""Cleanup backup files."""
if not os.path.exists(self.backup_path):
return
shutil.rmtree(self.backup_path)
# Wait for the folder to be removed
while os.path.exists(self.backup_path):
sleep(0.1)
self.hacs.log.debug("Backup dir %s cleared", self.backup_path)
class BackupNetDaemon(Backup):
"""BackupNetDaemon."""
def create(self) -> None:
"""Create a backup in /tmp"""
if not self._init_backup_dir():
return
for filename in os.listdir(self.repository.content.path.local):
if not filename.endswith(".yaml"):
continue
source_file_name = f"{self.repository.content.path.local}/{filename}"
target_file_name = f"{self.backup_path}/{filename}"
shutil.copyfile(source_file_name, target_file_name)
def restore(self) -> None:
"""Create a backup in /tmp"""
if not os.path.exists(self.backup_path):
return
for filename in os.listdir(self.backup_path):
if not filename.endswith(".yaml"):
continue
source_file_name = f"{self.backup_path}/{filename}"
target_file_name = f"{self.repository.content.path.local}/{filename}"
shutil.copyfile(source_file_name, target_file_name)

View File

@@ -0,0 +1,74 @@
"""HACS Configuration Schemas."""
# pylint: disable=dangerous-default-value
import voluptuous as vol
from ..const import LOCALE
# Configuration:
TOKEN = "token"
SIDEPANEL_TITLE = "sidepanel_title"
SIDEPANEL_ICON = "sidepanel_icon"
FRONTEND_REPO = "frontend_repo"
FRONTEND_REPO_URL = "frontend_repo_url"
APPDAEMON = "appdaemon"
NETDAEMON = "netdaemon"
# Options:
COUNTRY = "country"
DEBUG = "debug"
RELEASE_LIMIT = "release_limit"
EXPERIMENTAL = "experimental"
# Config group
PATH_OR_URL = "frontend_repo_path_or_url"
def hacs_base_config_schema(config: dict = {}) -> dict:
"""Return a shcema configuration dict for HACS."""
if not config:
config = {
TOKEN: "xxxxxxxxxxxxxxxxxxxxxxxxxxx",
}
return {
vol.Required(TOKEN, default=config.get(TOKEN)): str,
}
def hacs_config_option_schema(options: dict = {}) -> dict:
"""Return a shcema for HACS configuration options."""
if not options:
options = {
APPDAEMON: False,
COUNTRY: "ALL",
DEBUG: False,
EXPERIMENTAL: False,
NETDAEMON: False,
RELEASE_LIMIT: 5,
SIDEPANEL_ICON: "hacs:hacs",
SIDEPANEL_TITLE: "HACS",
FRONTEND_REPO: "",
FRONTEND_REPO_URL: "",
}
return {
vol.Optional(SIDEPANEL_TITLE, default=options.get(SIDEPANEL_TITLE)): str,
vol.Optional(SIDEPANEL_ICON, default=options.get(SIDEPANEL_ICON)): str,
vol.Optional(RELEASE_LIMIT, default=options.get(RELEASE_LIMIT)): int,
vol.Optional(COUNTRY, default=options.get(COUNTRY)): vol.In(LOCALE),
vol.Optional(APPDAEMON, default=options.get(APPDAEMON)): bool,
vol.Optional(NETDAEMON, default=options.get(NETDAEMON)): bool,
vol.Optional(DEBUG, default=options.get(DEBUG)): bool,
vol.Optional(EXPERIMENTAL, default=options.get(EXPERIMENTAL)): bool,
vol.Exclusive(FRONTEND_REPO, PATH_OR_URL): str,
vol.Exclusive(FRONTEND_REPO_URL, PATH_OR_URL): str,
}
def hacs_config_combined() -> dict:
"""Combine the configuration options."""
base = hacs_base_config_schema()
options = hacs_config_option_schema()
for option in options:
base[option] = options[option]
return base

View File

@@ -0,0 +1,253 @@
"""Data handler for HACS."""
import asyncio
from datetime import datetime
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.util import json as json_util
from ..base import HacsBase
from ..enums import HacsDisabledReason, HacsDispatchEvent, HacsGitHubRepo
from ..repositories.base import TOPIC_FILTER, HacsManifest, HacsRepository
from .logger import LOGGER
from .path import is_safe
from .store import async_load_from_store, async_save_to_store
DEFAULT_BASE_REPOSITORY_DATA = (
("authors", []),
("category", ""),
("description", ""),
("domain", None),
("downloads", 0),
("etag_repository", None),
("full_name", ""),
("last_updated", 0),
("hide", False),
("new", False),
("stargazers_count", 0),
("topics", []),
)
DEFAULT_EXTENDED_REPOSITORY_DATA = (
("archived", False),
("config_flow", False),
("default_branch", None),
("description", ""),
("first_install", False),
("installed_commit", None),
("installed", False),
("last_commit", None),
("last_version", None),
("manifest_name", None),
("open_issues", 0),
("published_tags", []),
("pushed_at", ""),
("releases", False),
("selected_tag", None),
("show_beta", False),
("stargazers_count", 0),
("topics", []),
)
class HacsData:
"""HacsData class."""
def __init__(self, hacs: HacsBase):
"""Initialize."""
self.logger = LOGGER
self.hacs = hacs
self.content = {}
async def async_force_write(self, _=None):
"""Force write."""
await self.async_write(force=True)
async def async_write(self, force: bool = False) -> None:
"""Write content to the store files."""
if not force and self.hacs.system.disabled:
return
self.logger.debug("<HacsData async_write> Saving data")
# Hacs
await async_save_to_store(
self.hacs.hass,
"hacs",
{
"archived_repositories": self.hacs.common.archived_repositories,
"renamed_repositories": self.hacs.common.renamed_repositories,
"ignored_repositories": self.hacs.common.ignored_repositories,
},
)
await self._async_store_content_and_repos()
async def _async_store_content_and_repos(self, _=None): # bb: ignore
"""Store the main repos file and each repo that is out of date."""
# Repositories
self.content = {}
for repository in self.hacs.repositories.list_all:
if repository.data.category in self.hacs.common.categories:
self.async_store_repository_data(repository)
await async_save_to_store(self.hacs.hass, "repositories", self.content)
for event in (HacsDispatchEvent.REPOSITORY, HacsDispatchEvent.CONFIG):
self.hacs.async_dispatch(event, {})
@callback
def async_store_repository_data(self, repository: HacsRepository) -> dict:
"""Store the repository data."""
data = {"repository_manifest": repository.repository_manifest.manifest}
for key, default_value in DEFAULT_BASE_REPOSITORY_DATA:
if (value := repository.data.__getattribute__(key)) != default_value:
data[key] = value
if repository.data.installed:
for key, default_value in DEFAULT_EXTENDED_REPOSITORY_DATA:
if (value := repository.data.__getattribute__(key)) != default_value:
data[key] = value
data["version_installed"] = repository.data.installed_version
if repository.data.last_fetched:
data["last_fetched"] = repository.data.last_fetched.timestamp()
self.content[str(repository.data.id)] = data
async def restore(self):
"""Restore saved data."""
self.hacs.status.new = False
try:
hacs = await async_load_from_store(self.hacs.hass, "hacs") or {}
except HomeAssistantError:
hacs = {}
try:
repositories = await async_load_from_store(self.hacs.hass, "repositories") or {}
except HomeAssistantError as exception:
self.hacs.log.error(
"Could not read %s, restore the file from a backup - %s",
self.hacs.hass.config.path(".storage/hacs.repositories"),
exception,
)
self.hacs.disable_hacs(HacsDisabledReason.RESTORE)
return False
if not hacs and not repositories:
# Assume new install
self.hacs.status.new = True
self.logger.info("<HacsData restore> Loading base repository information")
repositories = await self.hacs.hass.async_add_executor_job(
json_util.load_json,
f"{self.hacs.core.config_path}/custom_components/hacs/utils/default.repositories",
)
self.logger.info("<HacsData restore> Restore started")
# Hacs
self.hacs.common.archived_repositories = []
self.hacs.common.ignored_repositories = []
self.hacs.common.renamed_repositories = {}
# Clear out doubble renamed values
renamed = hacs.get("renamed_repositories", {})
for entry in renamed:
value = renamed.get(entry)
if value not in renamed:
self.hacs.common.renamed_repositories[entry] = value
# Clear out doubble archived values
for entry in hacs.get("archived_repositories", []):
if entry not in self.hacs.common.archived_repositories:
self.hacs.common.archived_repositories.append(entry)
# Clear out doubble ignored values
for entry in hacs.get("ignored_repositories", []):
if entry not in self.hacs.common.ignored_repositories:
self.hacs.common.ignored_repositories.append(entry)
try:
await self.register_unknown_repositories(repositories)
for entry, repo_data in repositories.items():
if entry == "0":
# Ignore repositories with ID 0
self.logger.debug(
"<HacsData restore> Found repository with ID %s - %s", entry, repo_data
)
continue
self.async_restore_repository(entry, repo_data)
self.logger.info("<HacsData restore> Restore done")
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
self.logger.critical(
"<HacsData restore> [%s] Restore Failed!", exception, exc_info=exception
)
return False
return True
async def register_unknown_repositories(self, repositories):
"""Registry any unknown repositories."""
register_tasks = [
self.hacs.async_register_repository(
repository_full_name=repo_data["full_name"],
category=repo_data["category"],
check=False,
repository_id=entry,
)
for entry, repo_data in repositories.items()
if entry != "0" and not self.hacs.repositories.is_registered(repository_id=entry)
]
if register_tasks:
await asyncio.gather(*register_tasks)
@callback
def async_restore_repository(self, entry, repository_data):
"""Restore repository."""
full_name = repository_data["full_name"]
if not (repository := self.hacs.repositories.get_by_full_name(full_name)):
self.logger.error("<HacsData restore> Did not find %s (%s)", full_name, entry)
return
# Restore repository attributes
self.hacs.repositories.set_repository_id(repository, entry)
repository.data.authors = repository_data.get("authors", [])
repository.data.description = repository_data.get("description", "")
repository.data.downloads = repository_data.get("downloads", 0)
repository.data.last_updated = repository_data.get("last_updated", 0)
repository.data.etag_repository = repository_data.get("etag_repository")
repository.data.topics = [
topic for topic in repository_data.get("topics", []) if topic not in TOPIC_FILTER
]
repository.data.domain = repository_data.get("domain")
repository.data.stargazers_count = repository_data.get(
"stargazers_count"
) or repository_data.get("stars", 0)
repository.releases.last_release = repository_data.get("last_release_tag")
repository.data.releases = repository_data.get("releases", False)
repository.data.installed = repository_data.get("installed", False)
repository.data.new = repository_data.get("new", False)
repository.data.selected_tag = repository_data.get("selected_tag")
repository.data.show_beta = repository_data.get("show_beta", False)
repository.data.last_version = repository_data.get("last_version")
repository.data.last_commit = repository_data.get("last_commit")
repository.data.installed_version = repository_data.get("version_installed")
repository.data.installed_commit = repository_data.get("installed_commit")
repository.data.manifest_name = repository_data.get("manifest_name")
if last_fetched := repository_data.get("last_fetched"):
repository.data.last_fetched = datetime.fromtimestamp(last_fetched)
repository.repository_manifest = HacsManifest.from_dict(
repository_data.get("repository_manifest", {})
)
if repository.localpath is not None and is_safe(self.hacs, repository.localpath):
# Set local path
repository.content.path.local = repository.localpath
if repository.data.installed:
repository.data.first_install = False
if full_name == HacsGitHubRepo.INTEGRATION:
repository.data.installed_version = self.hacs.version
repository.data.installed = True

View File

@@ -0,0 +1,7 @@
"""Util to decode content from the github API."""
from base64 import b64decode
def decode_content(content: str) -> str:
"""Decode content."""
return b64decode(bytearray(content, "utf-8")).decode()

View File

@@ -0,0 +1,41 @@
"""HACS Decorators."""
from __future__ import annotations
import asyncio
from functools import wraps
from typing import TYPE_CHECKING, Any, Coroutine
from ..const import DEFAULT_CONCURRENT_BACKOFF_TIME, DEFAULT_CONCURRENT_TASKS
if TYPE_CHECKING:
from ..base import HacsBase
def concurrent(
concurrenttasks: int = DEFAULT_CONCURRENT_TASKS,
backoff_time: int = DEFAULT_CONCURRENT_BACKOFF_TIME,
) -> Coroutine[Any, Any, None]:
"""Return a modified function."""
max_concurrent = asyncio.Semaphore(concurrenttasks)
def inner_function(function) -> Coroutine[Any, Any, None]:
@wraps(function)
async def wrapper(*args, **kwargs) -> None:
hacs: HacsBase = getattr(args[0], "hacs", None)
async with max_concurrent:
result = await function(*args, **kwargs)
if (
hacs is None
or hacs.queue is None
or hacs.queue.has_pending_tasks
or "update" not in function.__name__
):
await asyncio.sleep(backoff_time)
return result
return wrapper
return inner_function

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,46 @@
"""Filter functions."""
from __future__ import annotations
from typing import Any
def filter_content_return_one_of_type(
content: list[str | Any],
namestartswith: str,
filterfiltype: str,
attr: str = "name",
) -> list[str]:
"""Only match 1 of the filter."""
contents = []
filetypefound = False
for filename in content:
if isinstance(filename, str):
if filename.startswith(namestartswith):
if filename.endswith(f".{filterfiltype}"):
if not filetypefound:
contents.append(filename)
filetypefound = True
continue
else:
contents.append(filename)
else:
if getattr(filename, attr).startswith(namestartswith):
if getattr(filename, attr).endswith(f".{filterfiltype}"):
if not filetypefound:
contents.append(filename)
filetypefound = True
continue
else:
contents.append(filename)
return contents
def get_first_directory_in_directory(content: list[str | Any], dirname: str) -> str | None:
"""Return the first directory in dirname or None."""
directory = None
for path in content:
if path.full_path.startswith(dirname) and path.full_path != dirname:
if path.is_directory:
directory = path.filename
break
return directory

View File

@@ -0,0 +1,10 @@
"""JSON utils."""
try:
# Could be removed after 2022.06 is the min version
# But in case Home Assistant changes, keep this try/except here...
from homeassistant.helpers.json import json_loads
except ImportError:
from json import loads as json_loads
__all__ = ["json_loads"]

View File

@@ -0,0 +1,6 @@
"""Custom logger for HACS."""
import logging
from ..const import PACKAGE_NAME
LOGGER: logging.Logger = logging.getLogger(PACKAGE_NAME)

View File

@@ -0,0 +1,20 @@
"""Path utils"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..base import HacsBase
def is_safe(hacs: HacsBase, path: str | Path) -> bool:
"""Helper to check if path is safe to remove."""
return Path(path).as_posix() not in (
Path(f"{hacs.core.config_path}/{hacs.configuration.appdaemon_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.netdaemon_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.plugin_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.python_script_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.theme_path}").as_posix(),
Path(f"{hacs.core.config_path}/custom_components/").as_posix(),
)

View File

@@ -0,0 +1,81 @@
"""The QueueManager class."""
from __future__ import annotations
import asyncio
import time
from typing import Coroutine
from homeassistant.core import HomeAssistant
from ..exceptions import HacsExecutionStillInProgress
from .logger import LOGGER
_LOGGER = LOGGER
class QueueManager:
"""The QueueManager class."""
def __init__(self, hass: HomeAssistant) -> None:
self.hass = hass
self.queue: list[Coroutine] = []
self.running = False
@property
def pending_tasks(self) -> int:
"""Return a count of pending tasks in the queue."""
return len(self.queue)
@property
def has_pending_tasks(self) -> bool:
"""Return a count of pending tasks in the queue."""
return self.pending_tasks != 0
def clear(self) -> None:
"""Clear the queue."""
self.queue = []
def add(self, task: Coroutine) -> None:
"""Add a task to the queue."""
self.queue.append(task)
async def execute(self, number_of_tasks: int | None = None) -> None:
"""Execute the tasks in the queue."""
if self.running:
_LOGGER.debug("<QueueManager> Execution is already running")
raise HacsExecutionStillInProgress
if len(self.queue) == 0:
_LOGGER.debug("<QueueManager> The queue is empty")
return
self.running = True
_LOGGER.debug("<QueueManager> Checking out tasks to execute")
local_queue = []
if number_of_tasks:
for task in self.queue[:number_of_tasks]:
local_queue.append(task)
else:
for task in self.queue:
local_queue.append(task)
for task in local_queue:
self.queue.remove(task)
_LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue))
start = time.time()
result = await asyncio.gather(*local_queue, return_exceptions=True)
for entry in result:
if isinstance(entry, Exception):
_LOGGER.error("<QueueManager> %s", entry)
end = time.time() - start
_LOGGER.debug(
"<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds",
len(local_queue),
end,
)
if self.has_pending_tasks:
_LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue))
self.running = False

View File

@@ -0,0 +1,16 @@
"""Regex utils"""
from __future__ import annotations
import re
RE_REPOSITORY = re.compile(
r"(?:(?:.*github.com.)|^)([A-Za-z0-9-]+\/[\w.-]+?)(?:(?:\.git)?|(?:[^\w.-].*)?)$"
)
def extract_repository_from_url(url: str) -> str | None:
"""Extract the owner/repo part form a URL."""
match = re.match(RE_REPOSITORY, url)
if not match:
return None
return match.group(1).lower()

View File

@@ -0,0 +1,76 @@
"""Storage handers."""
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.storage import Store
from homeassistant.util import json as json_util
from ..const import VERSION_STORAGE
from ..exceptions import HacsException
from .logger import LOGGER
_LOGGER = LOGGER
class HACSStore(Store):
"""A subclass of Store that allows multiple loads in the executor."""
def load(self):
"""Load the data from disk if version matches."""
try:
data = json_util.load_json(self.path)
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
_LOGGER.critical(
"Could not load '%s', restore it from a backup or delete the file: %s",
self.path,
exception,
)
raise HacsException(exception) from exception
if data == {} or data["version"] != self.version:
return None
return data["data"]
def get_store_key(key):
"""Return the key to use with homeassistant.helpers.storage.Storage."""
return key if "/" in key else f"hacs.{key}"
def _get_store_for_key(hass, key, encoder):
"""Create a Store object for the key."""
return HACSStore(hass, VERSION_STORAGE, get_store_key(key), encoder=encoder, atomic_writes=True)
def get_store_for_key(hass, key):
"""Create a Store object for the key."""
return _get_store_for_key(hass, key, JSONEncoder)
async def async_load_from_store(hass, key):
"""Load the retained data from store and return de-serialized data."""
return await get_store_for_key(hass, key).async_load() or {}
async def async_save_to_store(hass, key, data):
"""Generate dynamic data to store and save it to the filesystem.
The data is only written if the content on the disk has changed
by reading the existing content and comparing it.
If the data has changed this will generate two executor jobs
If the data has not changed this will generate one executor job
"""
current = await async_load_from_store(hass, key)
if current is None or current != data:
await get_store_for_key(hass, key).async_save(data)
return
_LOGGER.debug(
"<HACSStore async_save_to_store> Did not store data for '%s'. Content did not change",
get_store_key(key),
)
async def async_remove_store(hass, key):
"""Remove a store element that should no longer be used."""
if "/" not in key:
return
await get_store_for_key(hass, key).async_remove()

View File

@@ -0,0 +1,36 @@
"""Custom template support."""
from __future__ import annotations
from typing import TYPE_CHECKING
from jinja2 import Template
if TYPE_CHECKING:
from ..base import HacsBase
from ..repositories.base import HacsRepository
def render_template(hacs: HacsBase, content: str, context: HacsRepository) -> str:
"""Render templates in content."""
if hacs.configuration.experimental:
# Do not render for experimental
return content
# Fix None issues
if context.releases.last_release_object is not None:
prerelease = context.releases.last_release_object.prerelease
else:
prerelease = False
# Render the template
try:
return Template(content).render(
installed=context.data.installed,
pending_update=context.pending_update,
prerelease=prerelease,
selected_tag=context.data.selected_tag,
version_available=context.releases.last_release,
version_installed=context.display_installed_version,
)
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
context.logger.debug(exception)
return content

View File

@@ -0,0 +1,69 @@
"""Validation utilities."""
from __future__ import annotations
from dataclasses import dataclass, field
from awesomeversion import AwesomeVersion
from homeassistant.helpers.config_validation import url as url_validator
import voluptuous as vol
from ..const import LOCALE
@dataclass
class Validate:
"""Validate."""
errors: list[str] = field(default_factory=list)
@property
def success(self) -> bool:
"""Return bool if the validation was a success."""
return len(self.errors) == 0
def _country_validator(values) -> list[str]:
"""Custom country validator."""
countries = []
if isinstance(values, str):
countries.append(values.upper())
elif isinstance(values, list):
for value in values:
countries.append(value.upper())
else:
raise vol.Invalid(f"Value '{values}' is not a string or list.", path=["country"])
for country in countries:
if country not in LOCALE:
raise vol.Invalid(f"Value '{country}' is not in {LOCALE}.", path=["country"])
return countries
HACS_MANIFEST_JSON_SCHEMA = vol.Schema(
{
vol.Optional("content_in_root"): bool,
vol.Optional("country"): _country_validator,
vol.Optional("filename"): str,
vol.Optional("hacs"): vol.Coerce(AwesomeVersion),
vol.Optional("hide_default_branch"): bool,
vol.Optional("homeassistant"): vol.Coerce(AwesomeVersion),
vol.Optional("persistent_directory"): str,
vol.Optional("render_readme"): bool,
vol.Optional("zip_release"): bool,
vol.Required("name"): str,
},
extra=vol.PREVENT_EXTRA,
)
INTEGRATION_MANIFEST_JSON_SCHEMA = vol.Schema(
{
vol.Required("codeowners"): list,
vol.Required("documentation"): url_validator,
vol.Required("domain"): str,
vol.Required("issue_tracker"): url_validator,
vol.Required("name"): str,
vol.Required("version"): vol.Coerce(AwesomeVersion),
},
extra=vol.ALLOW_EXTRA,
)

View File

@@ -0,0 +1,35 @@
"""Version utils."""
from __future__ import annotations
from functools import lru_cache
from awesomeversion import (
AwesomeVersion,
AwesomeVersionException,
AwesomeVersionStrategy,
)
@lru_cache(maxsize=1024)
def version_left_higher_then_right(left: str, right: str) -> bool | None:
"""Return a bool if source is newer than target, will also be true if identical."""
try:
left_version = AwesomeVersion(left)
right_version = AwesomeVersion(right)
if (
left_version.strategy != AwesomeVersionStrategy.UNKNOWN
and right_version.strategy != AwesomeVersionStrategy.UNKNOWN
):
return left_version > right_version
except (AwesomeVersionException, AttributeError, KeyError):
pass
return None
def version_left_higher_or_equal_then_right(left: str, right: str) -> bool:
"""Return a bool if source is newer than target, will also be true if identical."""
if left == right:
return True
return version_left_higher_then_right(left, right)

View File

@@ -0,0 +1,7 @@
"""Workarounds for issues that should not be fixed."""
DOMAIN_OVERRIDES = {
# https://github.com/hacs/integration/issues/2465
"custom-components/sensor.custom_aftership": "custom_aftership"
}