import importlib import traceback from functools import partial from inspect import signature import gradio as gr import extensions import modules.shared as shared from modules.logging_colors import logger state = {} available_extensions = [] setup_called = set() def apply_settings(extension, name): if not hasattr(extension, 'params'): return for param in extension.params: _id = f"{name}-{param}" shared.default_settings[_id] = extension.params[param] if _id in shared.settings: extension.params[param] = shared.settings[_id] def load_extensions(): global state, setup_called state = {} for i, name in enumerate(shared.args.extensions): if name in available_extensions: if name != 'api': logger.info(f'Loading the extension "{name}"') try: try: extension = importlib.import_module(f"extensions.{name}.script") except ModuleNotFoundError: logger.error(f"Could not import the requirements for '{name}'. Make sure to install the requirements for the extension.\n\n* To install requirements for all available extensions, launch the\n update_wizard script for your OS and choose the B option.\n\n* To install the requirements for this extension alone, launch the\n cmd script for your OS and paste the following command in the\n terminal window that appears:\n\nLinux / Mac:\n\npip install -r extensions/{name}/requirements.txt --upgrade\n\nWindows:\n\npip install -r extensions\\{name}\\requirements.txt --upgrade\n") raise # Only run setup() and apply settings from settings.yaml once if extension not in setup_called: apply_settings(extension, name) if hasattr(extension, "setup"): extension.setup() setup_called.add(extension) state[name] = [True, i] except: logger.error(f'Failed to load the extension "{name}".') traceback.print_exc() # This iterator returns the extensions in the order specified in the command-line def iterator(): for name in sorted(state, key=lambda x: state[x][1]): if state[name][0]: yield getattr(extensions, name).script, name # Extension functions that map string -> string def _apply_string_extensions(function_name, text, state, is_chat=False): for extension, _ in iterator(): if hasattr(extension, function_name): func = getattr(extension, function_name) # Handle old extensions without the 'state' arg or # the 'is_chat' kwarg count = 0 has_chat = False for k in signature(func).parameters: if k == 'is_chat': has_chat = True else: count += 1 if count == 2: args = [text, state] else: args = [text] if has_chat: kwargs = {'is_chat': is_chat} else: kwargs = {} text = func(*args, **kwargs) return text # Extension functions that map string -> string def _apply_chat_input_extensions(text, visible_text, state): for extension, _ in iterator(): if hasattr(extension, 'chat_input_modifier'): text, visible_text = extension.chat_input_modifier(text, visible_text, state) return text, visible_text # custom_generate_chat_prompt handling - currently only the first one will work def _apply_custom_generate_chat_prompt(text, state, **kwargs): for extension, _ in iterator(): if hasattr(extension, 'custom_generate_chat_prompt'): return extension.custom_generate_chat_prompt(text, state, **kwargs) return None # Extension that modifies the input parameters before they are used def _apply_state_modifier_extensions(state): for extension, _ in iterator(): if hasattr(extension, "state_modifier"): state = getattr(extension, "state_modifier")(state) return state # Extension that modifies the chat history before it is used def _apply_history_modifier_extensions(history): for extension, _ in iterator(): if hasattr(extension, "history_modifier"): history = getattr(extension, "history_modifier")(history) return history # Extension functions that override the default tokenizer output - The order of execution is not defined def _apply_tokenizer_extensions(function_name, state, prompt, input_ids, input_embeds): for extension, _ in iterator(): if hasattr(extension, function_name): prompt, input_ids, input_embeds = getattr(extension, function_name)(state, prompt, input_ids, input_embeds) return prompt, input_ids, input_embeds # Allow extensions to add their own logits processors to the stack being run. # Each extension would call `processor_list.append({their LogitsProcessor}())`. def _apply_logits_processor_extensions(function_name, processor_list, input_ids): for extension, _ in iterator(): if hasattr(extension, function_name): result = getattr(extension, function_name)(processor_list, input_ids) if type(result) is list: processor_list = result return processor_list # Get prompt length in tokens after applying extension functions which override the default tokenizer output # currently only the first one will work def _apply_custom_tokenized_length(prompt): for extension, _ in iterator(): if hasattr(extension, 'custom_tokenized_length'): return getattr(extension, 'custom_tokenized_length')(prompt) return None # Custom generate reply handling - currently only the first one will work def _apply_custom_generate_reply(): for extension, _ in iterator(): if hasattr(extension, 'custom_generate_reply'): return getattr(extension, 'custom_generate_reply') return None def _apply_custom_css(): all_css = '' for extension, _ in iterator(): if hasattr(extension, 'custom_css'): all_css += getattr(extension, 'custom_css')() return all_css def _apply_custom_js(): all_js = '' for extension, _ in iterator(): if hasattr(extension, 'custom_js'): all_js += getattr(extension, 'custom_js')() return all_js def create_extensions_block(): to_display = [] for extension, name in iterator(): if hasattr(extension, "ui") and not (hasattr(extension, 'params') and extension.params.get('is_tab', False)): to_display.append((extension, name)) # Creating the extension ui elements if len(to_display) > 0: with gr.Column(elem_id="extensions"): for row in to_display: extension, _ = row extension.ui() def create_extensions_tabs(): for extension, name in iterator(): if hasattr(extension, "ui") and (hasattr(extension, 'params') and extension.params.get('is_tab', False)): display_name = getattr(extension, 'params', {}).get('display_name', name) with gr.Tab(display_name, elem_classes="extension-tab"): extension.ui() EXTENSION_MAP = { "input": partial(_apply_string_extensions, "input_modifier"), "output": partial(_apply_string_extensions, "output_modifier"), "chat_input": _apply_chat_input_extensions, "state": _apply_state_modifier_extensions, "history": _apply_history_modifier_extensions, "bot_prefix": partial(_apply_string_extensions, "bot_prefix_modifier"), "tokenizer": partial(_apply_tokenizer_extensions, "tokenizer_modifier"), 'logits_processor': partial(_apply_logits_processor_extensions, 'logits_processor_modifier'), "custom_generate_chat_prompt": _apply_custom_generate_chat_prompt, "custom_generate_reply": _apply_custom_generate_reply, "tokenized_length": _apply_custom_tokenized_length, "css": _apply_custom_css, "js": _apply_custom_js } def apply_extensions(typ, *args, **kwargs): if typ not in EXTENSION_MAP: raise ValueError(f"Invalid extension type {typ}") return EXTENSION_MAP[typ](*args, **kwargs)