Source code for fetchez.registry

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
fetchez.registry
~~~~~~~~~~~~~~~~

A unified, dynamic registry system for discovering and loading
Fetchez Modules, Hooks, Schemas, and other plugins.

:copyright: (c) 2010-2026 Regents of the University of Colorado
:license: MIT, see LICENSE for more details.
"""

import os
import sys
import json
import pkgutil
import importlib
import importlib.util
import inspect
import logging
from typing import Dict, Any, Type, Optional

from fetchez.modules import FetchModule
from fetchez.hooks import FetchHook
from fetchez.recipes.schemas import BaseSchema
from fetchez.streams import BaseReader

logger = logging.getLogger(__name__)


[docs] class PluginRegistry: """Base class for dynamically discovering and registering plugins.""" # These must be defined by the subclasses base_class: Optional[Type] = None builtin_pkg: str = "" entry_point_group: str = "" user_folder: str = ""
[docs] @classmethod def get_registry(cls) -> Dict[str, Any]: """Initialization of the class-level registry dictionary.""" if not hasattr(cls, "_registry"): setattr(cls, "_registry", {}) return getattr(cls, "_registry")
[docs] @classmethod def load_builtins(cls): """Recursively scan and load all built-in plugins.""" registry = cls.get_registry() if registry: return try: builtin_module = importlib.import_module(cls.builtin_pkg) for _, modname, ispkg in pkgutil.walk_packages( path=builtin_module.__path__, prefix=builtin_module.__name__ + ".", ): if not ispkg: try: mod = importlib.import_module(modname) cls._register_from_module(mod) except Exception as e: logger.warning(f"Failed to load built-in {modname}: {e}") except ImportError: logger.warning(f"Built-in package {cls.builtin_pkg} not found.")
[docs] @classmethod def load_user_plugins(cls): """Scan local directories for user-provided plugins.""" home = os.path.expanduser("~") search_dirs = [ os.path.join(home, ".fetchez", cls.user_folder), os.path.join(os.getcwd(), ".fetchez", cls.user_folder), ] for p_dir in search_dirs: if not os.path.exists(p_dir): continue for f in os.listdir(p_dir): if f.endswith(".py") and not f.startswith("_"): filepath = os.path.join(p_dir, f) mod_name = f"fetchez_user_{cls.user_folder}_{f[:-3]}" try: spec = importlib.util.spec_from_file_location( mod_name, filepath ) if spec and spec.loader: mod = importlib.util.module_from_spec(spec) sys.modules[mod_name] = mod spec.loader.exec_module(mod) cls._register_from_module(mod) except Exception as e: logger.warning(f"Failed to load user plugin {filepath}: {e}")
[docs] @classmethod def load_installed_plugins(cls): """Load external pip-installed extensions via entry_points.""" from importlib.metadata import entry_points try: eps = entry_points(group=cls.entry_point_group) for ep in eps: plugin_module = ep.load() # Scan the loaded extension for submodules for _, modname, ispkg in pkgutil.walk_packages( path=plugin_module.__path__, prefix=plugin_module.__name__ + ".", ): if not ispkg: try: mod = importlib.import_module(modname) cls._register_from_module(mod) except Exception as e: logger.warning( f"Failed to load external plugin {modname}: {e}" ) except Exception as e: logger.error( f"Error checking entry points for {cls.entry_point_group}: {e}" )
[docs] @classmethod def load_all(cls): """Load all plugins: builtins, user plugins, and pip extensions.""" cls.load_builtins() cls.load_user_plugins() cls.load_installed_plugins()
@classmethod def _get_cache_path(cls): """Path to the JSON registry cache.""" cache_dir = os.path.expanduser("~/.fetchez") os.makedirs(cache_dir, exist_ok=True) return os.path.join(cache_dir, f"{cls.__name__}_cache.json")
[docs] @classmethod def load_fast(cls): """Loads from the JSON cache for instant CLI menus. If cache is missing, falls back to the slow load_all(). """ registry = cls.get_registry() if registry: return cache_path = cls._get_cache_path() if os.path.exists(cache_path): try: with open(cache_path, "r") as f: registry.update(json.load(f)) return except Exception as e: logger.debug(f"Cache read failed: {e}") cls.load_all() cls.save_cache()
[docs] @classmethod def save_cache(cls): """Dumps the discovered registry to JSON.""" clean_registry = {} for k, meta in cls.get_registry().items(): clean_meta = { key: val for key, val in meta.items() if isinstance(val, (str, int, float, list, dict)) } clean_registry[k] = clean_meta with open(cls._get_cache_path(), "w") as f: json.dump(clean_registry, f, indent=2)
@classmethod def _register_from_module(cls, module): """Inspect a module and dynamically extract its metadata.""" registry = cls.get_registry() for name, obj in inspect.getmembers(module, inspect.isclass): if issubclass(obj, cls.base_class) and obj is not cls.base_class: mod_key = getattr(obj, "name", name.lower()) meta = { "mod": module.__name__, "cls": name, "_class_obj": obj, "aliases": obj.__dict__.get("meta_aliases", []), } # METADATA EXTRACTION # Modules must define `meta_` atrributes for attr_name in dir(obj): if attr_name.startswith("meta_"): clean_key = attr_name.replace("meta_", "") meta[clean_key] = getattr(obj, attr_name) # Fallbacks for the CLI meta.setdefault("category", "Generic") meta.setdefault("desc", "No description provided.") meta.setdefault("domain", "Universal (Files)") meta.setdefault("requires", "any") meta["import_path"] = f"{obj.__module__}.{obj.__name__}" registry[mod_key] = meta for alias in meta["aliases"]: registry[alias] = meta
[docs] @classmethod def get_info(cls, mod_key: str) -> Dict[str, Any]: return cls.get_registry().get(mod_key, {})
@classmethod def _get_class(cls, mod_key: str): meta = cls.get_registry().get(mod_key) return meta.get("_class_obj") if meta else None
[docs] @classmethod def get_class(cls, name: str): """Returns the class if cached, or lazily imports it on demand.""" meta = cls.get_registry().get(name) if not meta: return None if "import_path" in meta: mod_path, class_name = meta["import_path"].rsplit(".", 1) module = importlib.import_module(mod_path) actual_cls = getattr(module, class_name) return actual_cls return None
load_module = get_class # alias for backward compatability
[docs] @classmethod def list_all(cls) -> Dict[str, Any]: return cls.get_registry()
[docs] @classmethod def search_modules(cls, term: str): """Search modules by name, description, agency, or tags.""" term = term.lower() results = [] for key, meta in cls.get_registry().items(): if ( term in key.lower() or term in meta.get("desc", "").lower() or term in meta.get("agency", "").lower() or any(term in tag.lower() for tag in meta.get("tags", [])) or any(term in alias.lower() for alias in meta.get("aliases", "")) ): if key not in results: results.append(key) return results
[docs] class YamlRegistry: """A registry for discovering and loading yaml configuration files (recipes and hook presets).""" # These must be defined by the subclasses base_class: Optional[Type] = None builtin_pkg: str = "" entry_point_group: str = "" user_folder: str = ""
[docs] @classmethod def get_registry(cls) -> Dict[str, Any]: if not hasattr(cls, "_registry"): setattr(cls, "_registry", {}) return getattr(cls, "_registry")
[docs] @classmethod def load_all(cls): cls.get_registry() import importlib.metadata import importlib.resources try: eps = importlib.metadata.entry_points(group=cls.entry_point_group) except TypeError: eps = importlib.metadata.entry_points().get(cls.entry_point_group, []) for ep in eps: pkg_name = ep.value try: for file_path in importlib.resources.files(pkg_name).iterdir(): if file_path.name.endswith((".yaml", ".yml")): cls._register_yaml( file_path.read_text(encoding="utf-8"), str(file_path) ) except Exception as e: logger.warning(f"Failed to load yamls from package {pkg_name}: {e}") builtin_module = importlib.import_module(cls.builtin_pkg) builtin_path = builtin_module.__path__ home_dir = os.path.expanduser(f"~/.fetchez/{cls.user_folder}") builtin_path.append(home_dir) for fdir in builtin_path: if os.path.exists(fdir): for fn in os.listdir(fdir): if fn.endswith((".yaml", ".yml")): try: with open( os.path.join(fdir, fn), "r", encoding="utf-8" ) as f: cls._register_yaml(f.read(), os.path.join(fdir, fn)) except Exception as e: logger.warning(f"Failed to load yaml {fn}: {e}")
load_fast = load_all @classmethod def _register_yaml(cls, yaml_content: str, file_path: str): import yaml registry = cls.get_registry() try: config = yaml.safe_load(yaml_content) if not config: return if "name" in config: registry[config["name"]] = config except Exception as e: logger.debug(f"Failed to parse YAML {file_path}: {e}")
[docs] @classmethod def get_yaml(cls, name: str) -> Optional[Dict[str, Any]]: return cls.get_registry().get(name)
# Temporary for backwards compatibility get_preset = get_yaml get_recipe = get_yaml
# ============================================================================= # The Registries # =============================================================================
[docs] class ModuleRegistry(PluginRegistry): base_class = FetchModule builtin_pkg = "fetchez.modules" entry_point_group = "fetchez.modules" user_folder = "modules"
[docs] class HookRegistry(PluginRegistry): base_class = FetchHook builtin_pkg = "fetchez.hooks" entry_point_group = "fetchez.hooks" user_folder = "hooks"
# Schemas extend Recipes
[docs] class SchemaRegistry(PluginRegistry): base_class = BaseSchema builtin_pkg = "fetchez.recipes.schemas" entry_point_group = "fetchez.recipes.schemas" user_folder = "recipes/schemas"
[docs] @classmethod def apply_schema(cls, config): """Looks for a schema in the config and applies its rules.""" schema_name = config.get("schema") if schema_name: schema_name = schema_name.lower() if schema_name in cls.get_registry(): logger.info(f"Applying '{schema_name}' schema rules to recipe...") SchemaCls = cls.get_class(schema_name) return SchemaCls.apply(config) else: logger.warning( f"Schema '{schema_name}' requested but not registered. Ignoring." ) return config
[docs] class ReaderRegistry(PluginRegistry): base_class = BaseReader builtin_pkg = "fetchez.streams.readers" entry_point_group = "fetchez.streams.readers" user_folder = "streams/readers"
[docs] @classmethod def get_reader(cls, src, term: str, **kwargs): if term: profile = ProfileRegistry.get_yaml(term) if profile: logger.debug(f"Using reader-profile {profile}") profile_reader = profile.get("reader", {}) reader_name = profile_reader.get("name", "") reader = cls.get_class(reader_name) if reader: profile_args = profile_reader.get("args", {}) return reader(src, **profile_args, **kwargs) else: logger.debug(f"No reader profile found, checking `{term}` data-type") reader = cls.get_reader_for_dtype(term) if reader: logger.debug(f"Found `{reader.name}` for data-type: `{term}`") return reader(src, **kwargs) _ext = src.split(".")[-1] logger.debug(f"No reader dtype found, checking `{_ext}` in extensions") reader = cls.get_reader_for_ext(_ext) if reader: return reader(src, **kwargs) return None
[docs] @classmethod def get_reader_for_ext(cls, ext: str): """Iterate through registered readers to find one that supports this extension.""" for name, meta in cls.get_registry().items(): if ext.lower() in meta.get("extensions", []): return cls.get_class(name) return None
[docs] @classmethod def get_reader_for_dtype(cls, dtype: str): """Iterate through registered readers to find one that supports this dtype.""" for name, meta in cls.get_registry().items(): if dtype.lower() in meta.get("dtype", ""): return cls.get_class(name) return None
[docs] class RecipeRegistry(YamlRegistry): """A registry for discovering and loading YAML recipes.""" # _registry = {} builtin_pkg = "fetchez.recipes" entry_point_group = "fetchez.recipes" user_folder = "recipes" @classmethod def _register_yaml(cls, yaml_content: str, file_path: str): import yaml registry = cls.get_registry() try: config = yaml.safe_load(yaml_content) if not config or "project" not in config: return # Use the project name from the YAML, fallback to the filename name = config["project"].get( "name", os.path.basename(file_path).replace(".yaml", "") ) desc = config["project"].get("description", "No description available.") registry[name] = { "name": name, "desc": desc, "config": config, "path": file_path, } except Exception as e: logger.debug(f"Failed to parse recipe YAML {file_path}: {e}")
# Presets extend Hooks
[docs] class PresetRegistry(YamlRegistry): builtin_pkg = "fetchez.hooks.presets" entry_point_group = "fetchez.hooks.presets" user_folder = "hooks/presets" @classmethod def _register_yaml(cls, yaml_content: str, file_path: str): import yaml registry = cls.get_registry() try: config = yaml.safe_load(yaml_content) if not config: return # Legacy ~/.fetchez/presets.py if "presets" in config: for p_name, p_def in config.get("presets", {}).items(): registry[p_name] = p_def else: if "name" in config and "hooks" in config: registry[config["name"]] = config except Exception as e: logger.debug(f"Failed to parse preset YAML {file_path}: {e}")
[docs] @classmethod def hook_list_from_preset(cls, preset_def): """Convert yaml definition to list of Hook Objects.""" hooks = [] for h_def in preset_def.get("hooks", []): name = h_def.get("name") kwargs = h_def.get("args", {}) hook_cls = HookRegistry.get_class(name) if hook_cls: try: hooks.append(hook_cls(**kwargs)) except Exception as exception: logger.error(f"Failed to init preset hook '{name}': {exception}") else: logger.warning(f"Preset hook '{name}' not found.") return hooks
# Bundles extend Modules
[docs] class BundleRegistry(YamlRegistry): """A registry for discovering and loading Module Bundles (Data Packages).""" builtin_pkg = "fetchez.modules.bundles" entry_point_group = "fetchez.modules.bundles" user_folder = "modules/bundles"
# Profiles extend Streams
[docs] class ProfileRegistry(YamlRegistry): """A registry for discovering and loading Format Profilesx.""" builtin_pkg = "fetchez.streams.profiles" entry_point_group = "fetchez.streams.profiles" user_folder = "streams/profiles"
# @classmethod # def reader_args_from_profile(cls, profile_def): # """Convert yaml definition to list of Hook Objects.""" # readers = {} # profile_id = profile_def.get("profile") # for p_def in profile_def.get("reader", []): # name = p_def.get("name") # kwargs = p_def.get("args", {}) # readers[name] = kwargs # return readers # ============================================================================= # Old YAML Registries (recipe & preset) # ============================================================================= class _RecipeRegistry: """A registry for discovering and loading YAML recipes.""" # _registry = {} entry_point_group = "fetchez.recipes" user_folder = "recipes" @classmethod def get_registry(cls) -> Dict[str, Any]: """Initialization of the class-level registry dictionary.""" if not hasattr(cls, "_registry"): setattr(cls, "_registry", {}) return getattr(cls, "_registry") # @classmethod # def get_registry(cls) -> Dict[str, Any]: # return cls._registry @classmethod def load_all(cls): cls.get_registry() # if cls._registry: # return import importlib.metadata import importlib.resources try: eps = importlib.metadata.entry_points(group=cls.entry_point_group) except TypeError: eps = importlib.metadata.entry_points().get(cls.entry_point_group, []) for ep in eps: pkg_name = ep.value try: for file_path in importlib.resources.files(pkg_name).iterdir(): if file_path.name.endswith((".yaml", ".yml")): cls._register_yaml( file_path.read_text(encoding="utf-8"), str(file_path) ) except Exception as e: logger.warning(f"Failed to load recipes from package {pkg_name}: {e}") home_dir = os.path.expanduser(f"~/.fetchez/{cls.user_folder}") if os.path.exists(home_dir): for fn in os.listdir(home_dir): if fn.endswith((".yaml", ".yml")): try: with open( os.path.join(home_dir, fn), "r", encoding="utf-8" ) as f: cls._register_yaml(f.read(), os.path.join(home_dir, fn)) except Exception as e: logger.warning(f"Failed to load local recipe {fn}: {e}") @classmethod def _register_yaml(cls, yaml_content: str, file_path: str): import yaml registry = cls.get_registry() try: config = yaml.safe_load(yaml_content) if not config or "project" not in config: return # Use the project name from the YAML, fallback to the filename name = config["project"].get( "name", os.path.basename(file_path).replace(".yaml", "") ) desc = config["project"].get("description", "No description available.") registry[name] = { "name": name, "desc": desc, "config": config, "path": file_path, } except Exception as e: logger.debug(f"Failed to parse recipe YAML {file_path}: {e}") @classmethod def get_recipe(cls, name: str) -> Optional[Dict[str, Any]]: registry = cls.get_registry() return registry.get(name) class _PresetRegistry: """A registry for discovering and loading hook Presets (Macros).""" builtin_pkg = "fetchez.presets" entry_point_group = "fetchez.presets" user_folder = "presets" @classmethod def get_registry(cls) -> Dict[str, Any]: if not hasattr(cls, "_registry"): setattr(cls, "_registry", {}) return getattr(cls, "_registry") @classmethod def load_all(cls): cls.get_registry() import importlib.metadata import importlib.resources try: eps = importlib.metadata.entry_points(group=cls.entry_point_group) except TypeError: eps = importlib.metadata.entry_points().get(cls.entry_point_group, []) for ep in eps: pkg_name = ep.value try: for file_path in importlib.resources.files(pkg_name).iterdir(): if file_path.name.endswith((".yaml", ".yml")): cls._register_yaml( file_path.read_text(encoding="utf-8"), str(file_path) ) except Exception as e: logger.warning(f"Failed to load presets from package {pkg_name}: {e}") builtin_module = importlib.import_module(cls.builtin_pkg) builtin_path = builtin_module.__path__ home_dir = os.path.expanduser(f"~/.fetchez/{cls.user_folder}") builtin_path.append(home_dir) for fdir in builtin_path: if os.path.exists(fdir): for fn in os.listdir(fdir): if fn.endswith((".yaml", ".yml")): try: with open( os.path.join(fdir, fn), "r", encoding="utf-8" ) as f: cls._register_yaml(f.read(), os.path.join(fdir, fn)) except Exception as e: logger.warning(f"Failed to load preset {fn}: {e}") legacy_file = os.path.expanduser("~/.fetchez/presets.yaml") if os.path.exists(legacy_file): try: with open(legacy_file, "r", encoding="utf-8") as f: cls._register_yaml(f.read(), legacy_file, is_legacy=True) except Exception: pass @classmethod def _register_yaml(cls, yaml_content: str, file_path: str, is_legacy=False): import yaml registry = cls.get_registry() try: config = yaml.safe_load(yaml_content) if not config: return if is_legacy or "presets" in config: for p_name, p_def in config.get("presets", {}).items(): registry[p_name] = p_def else: if "name" in config and "hooks" in config: registry[config["name"]] = config except Exception as e: logger.debug(f"Failed to parse preset YAML {file_path}: {e}") @classmethod def get_preset(cls, name: str) -> Optional[Dict[str, Any]]: return cls.get_registry().get(name) @classmethod def hook_list_from_preset(cls, preset_def): """Convert yaml definition to list of Hook Objects.""" hooks = [] for h_def in preset_def.get("hooks", []): name = h_def.get("name") kwargs = h_def.get("args", {}) hook_cls = HookRegistry.get_class(name) if hook_cls: try: hooks.append(hook_cls(**kwargs)) except Exception as exception: logger.error(f"Failed to init preset hook '{name}': {exception}") else: logger.warning(f"Preset hook '{name}' not found.") return hooks