import ast import copy import html import random import re import time import traceback import numpy as np import torch import transformers from transformers import LogitsProcessorList import modules.shared as shared from modules.callbacks import ( Iteratorize, Stream, _StopEverythingStoppingCriteria ) from modules.extensions import apply_extensions from modules.html_generator import generate_4chan_html, generate_basic_html from modules.logging_colors import logger from modules.models import clear_torch_cache, local_rank def generate_reply(*args, **kwargs): shared.generation_lock.acquire() try: for result in _generate_reply(*args, **kwargs): yield result finally: shared.generation_lock.release() def _generate_reply(question, state, stopping_strings=None, is_chat=False, escape_html=False): # Find the appropriate generation function generate_func = apply_extensions('custom_generate_reply') if generate_func is None: if shared.model_name == 'None' or shared.model is None: logger.error("No model is loaded! Select one in the Model tab.") yield '' return if shared.model.__class__.__name__ in ['LlamaCppModel', 'RWKVModel', 'ExllamaModel', 'Exllamav2Model', 'CtransformersModel']: generate_func = generate_reply_custom else: generate_func = generate_reply_HF # Prepare the input original_question = question if not is_chat: state = apply_extensions('state', state) question = apply_extensions('input', question, state) # Find the stopping strings all_stop_strings = [] for st in (stopping_strings, ast.literal_eval(f"[{state['custom_stopping_strings']}]")): if type(st) is list and len(st) > 0: all_stop_strings += st if shared.args.verbose: print(f'\n\n{question}\n--------------------\n') shared.stop_everything = False clear_torch_cache() seed = set_manual_seed(state['seed']) last_update = -1 reply = '' is_stream = state['stream'] if len(all_stop_strings) > 0 and not state['stream']: state = copy.deepcopy(state) state['stream'] = True # Generate for reply in generate_func(question, original_question, seed, state, stopping_strings, is_chat=is_chat): if escape_html: reply = html.escape(reply) reply, stop_found = apply_stopping_strings(reply, all_stop_strings) if is_stream: cur_time = time.time() # Maximum number of tokens/second if state['max_tokens_second'] > 0: diff = 1 / state['max_tokens_second'] - (cur_time - last_update) if diff > 0: time.sleep(diff) last_update = time.time() yield reply # Limit updates to 24 per second to not stress low latency networks else: if cur_time - last_update > 0.041666666666666664: last_update = cur_time yield reply if stop_found: break if not is_chat: reply = apply_extensions('output', reply, state) yield reply def encode(prompt, add_special_tokens=True, add_bos_token=True, truncation_length=None): if shared.tokenizer is None: raise ValueError('No tokenizer is loaded') if shared.model.__class__.__name__ in ['LlamaCppModel', 'RWKVModel', 'CtransformersModel', 'Exllamav2Model']: input_ids = shared.tokenizer.encode(str(prompt)) if shared.model.__class__.__name__ not in ['Exllamav2Model']: input_ids = np.array(input_ids).reshape(1, len(input_ids)) else: input_ids = shared.tokenizer.encode(str(prompt), return_tensors='pt', add_special_tokens=add_special_tokens) # This is a hack for making replies more creative. if not add_bos_token and input_ids[0][0] == shared.tokenizer.bos_token_id: input_ids = input_ids[:, 1:] # Handling truncation if truncation_length is not None: input_ids = input_ids[:, -truncation_length:] if shared.model.__class__.__name__ in ['LlamaCppModel', 'RWKVModel', 'ExllamaModel', 'Exllamav2Model', 'CtransformersModel'] or shared.args.cpu: return input_ids elif shared.args.deepspeed: return input_ids.to(device=local_rank) elif torch.backends.mps.is_available(): device = torch.device('mps') return input_ids.to(device) else: return input_ids.cuda() def decode(output_ids, skip_special_tokens=True): if shared.tokenizer is None: raise ValueError('No tokenizer is loaded') return shared.tokenizer.decode(output_ids, skip_special_tokens) def get_encoded_length(prompt): length_after_extensions = apply_extensions('tokenized_length', prompt) if length_after_extensions is not None: return length_after_extensions return len(encode(prompt)[0]) def get_token_ids(prompt): tokens = encode(prompt)[0] decoded_tokens = [shared.tokenizer.decode([i]) for i in tokens] output = '' for row in list(zip(tokens, decoded_tokens)): output += f"{str(int(row[0])).ljust(5)} - {repr(row[1])}\n" return output def get_max_prompt_length(state): return state['truncation_length'] - state['max_new_tokens'] def generate_reply_wrapper(question, state, stopping_strings=None): """ Returns formatted outputs for the UI """ reply = question if not shared.is_seq2seq else '' yield formatted_outputs(reply, shared.model_name) for reply in generate_reply(question, state, stopping_strings, is_chat=False, escape_html=True): if not shared.is_seq2seq: reply = question + reply yield formatted_outputs(reply, shared.model_name) def formatted_outputs(reply, model_name): if any(s in model_name for s in ['gpt-4chan', 'gpt4chan']): reply = fix_gpt4chan(reply) return html.unescape(reply), generate_4chan_html(reply) else: return html.unescape(reply), generate_basic_html(reply) def fix_gpt4chan(s): """ Removes empty replies from gpt4chan outputs """ for i in range(10): s = re.sub("--- [0-9]*\n>>[0-9]*\n---", "---", s) s = re.sub("--- [0-9]*\n *\n---", "---", s) s = re.sub("--- [0-9]*\n\n\n---", "---", s) return s def fix_galactica(s): """ Fix the LaTeX equations in GALACTICA """ s = s.replace(r'\[', r'$') s = s.replace(r'\]', r'$') s = s.replace(r'\(', r'$') s = s.replace(r'\)', r'$') s = s.replace(r'$$', r'$') s = re.sub(r'\n', r'\n\n', s) s = re.sub(r"\n{3,}", "\n\n", s) return s def get_reply_from_output_ids(output_ids, input_ids, original_question, state, is_chat=False): if shared.is_seq2seq: reply = decode(output_ids, state['skip_special_tokens']) else: new_tokens = len(output_ids) - len(input_ids[0]) reply = decode(output_ids[-new_tokens:], state['skip_special_tokens']) # Prevent LlamaTokenizer from skipping a space if type(shared.tokenizer) in [transformers.LlamaTokenizer, transformers.LlamaTokenizerFast] and len(output_ids) > 0: if shared.tokenizer.convert_ids_to_tokens(int(output_ids[-new_tokens])).startswith('▁'): reply = ' ' + reply return reply def set_manual_seed(seed): seed = int(seed) if seed == -1: seed = random.randint(1, 2**31) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) return seed def stop_everything_event(): shared.stop_everything = True def apply_stopping_strings(reply, all_stop_strings): stop_found = False for string in all_stop_strings: idx = reply.find(string) if idx != -1: reply = reply[:idx] stop_found = True break if not stop_found: # If something like "\nYo" is generated just before "\nYou:" # is completed, trim it for string in all_stop_strings: for j in range(len(string) - 1, 0, -1): if reply[-j:] == string[:j]: reply = reply[:-j] break else: continue break return reply, stop_found def generate_reply_HF(question, original_question, seed, state, stopping_strings=None, is_chat=False): generate_params = {} for k in ['max_new_tokens', 'do_sample', 'temperature', 'top_p', 'typical_p', 'repetition_penalty', 'repetition_penalty_range', 'encoder_repetition_penalty', 'top_k', 'min_length', 'no_repeat_ngram_size', 'num_beams', 'penalty_alpha', 'length_penalty', 'early_stopping', 'tfs', 'top_a', 'mirostat_mode', 'mirostat_tau', 'mirostat_eta', 'guidance_scale']: generate_params[k] = state[k] if state['negative_prompt'] != '': generate_params['negative_prompt_ids'] = encode(state['negative_prompt']) for k in ['epsilon_cutoff', 'eta_cutoff']: if state[k] > 0: generate_params[k] = state[k] * 1e-4 if state['ban_eos_token']: generate_params['suppress_tokens'] = [shared.tokenizer.eos_token_id] if state['custom_token_bans']: to_ban = [int(x) for x in state['custom_token_bans'].split(',')] if len(to_ban) > 0: if generate_params.get('suppress_tokens', None): generate_params['suppress_tokens'] += to_ban else: generate_params['suppress_tokens'] = to_ban generate_params.update({'use_cache': not shared.args.no_cache}) if shared.args.deepspeed: generate_params.update({'synced_gpus': True}) # Encode the input input_ids = encode(question, add_bos_token=state['add_bos_token'], truncation_length=get_max_prompt_length(state)) output = input_ids[0] cuda = not any((shared.args.cpu, shared.args.deepspeed)) if state['auto_max_new_tokens']: generate_params['max_new_tokens'] = state['truncation_length'] - input_ids.shape[-1] # Add the encoded tokens to generate_params question, input_ids, inputs_embeds = apply_extensions('tokenizer', state, question, input_ids, None) original_input_ids = input_ids generate_params.update({'inputs': input_ids}) if inputs_embeds is not None: generate_params.update({'inputs_embeds': inputs_embeds}) # Stopping criteria / eos token eos_token_ids = [shared.tokenizer.eos_token_id] if shared.tokenizer.eos_token_id is not None else [] generate_params['eos_token_id'] = eos_token_ids generate_params['stopping_criteria'] = transformers.StoppingCriteriaList() generate_params['stopping_criteria'].append(_StopEverythingStoppingCriteria()) processor = state.get('logits_processor', LogitsProcessorList([])) # In case folks just pass in a processor by itself. if type(processor) != LogitsProcessorList: processor = LogitsProcessorList([processor]) apply_extensions('logits_processor', processor, input_ids) generate_params['logits_processor'] = processor t0 = time.time() try: if not is_chat and not shared.is_seq2seq: yield '' # Generate the entire reply at once. if not state['stream']: with torch.no_grad(): output = shared.model.generate(**generate_params)[0] if cuda: output = output.cuda() yield get_reply_from_output_ids(output, input_ids, original_question, state, is_chat=is_chat) # Stream the reply 1 token at a time. # This is based on the trick of using 'stopping_criteria' to create an iterator. else: def generate_with_callback(callback=None, *args, **kwargs): kwargs['stopping_criteria'].append(Stream(callback_func=callback)) clear_torch_cache() with torch.no_grad(): shared.model.generate(**kwargs) def generate_with_streaming(**kwargs): return Iteratorize(generate_with_callback, [], kwargs, callback=None) with generate_with_streaming(**generate_params) as generator: for output in generator: yield get_reply_from_output_ids(output, input_ids, original_question, state, is_chat=is_chat) if output[-1] in eos_token_ids: break except Exception: traceback.print_exc() finally: t1 = time.time() original_tokens = len(original_input_ids[0]) new_tokens = len(output) - (original_tokens if not shared.is_seq2seq else 0) print(f'Output generated in {(t1-t0):.2f} seconds ({new_tokens/(t1-t0):.2f} tokens/s, {new_tokens} tokens, context {original_tokens}, seed {seed})') return def generate_reply_custom(question, original_question, seed, state, stopping_strings=None, is_chat=False): """ For models that do not use the transformers library for sampling """ seed = set_manual_seed(state['seed']) t0 = time.time() reply = '' try: if not is_chat: yield '' if not state['stream']: reply = shared.model.generate(question, state) yield reply else: for reply in shared.model.generate_with_streaming(question, state): yield reply except Exception: traceback.print_exc() finally: t1 = time.time() original_tokens = len(encode(original_question)[0]) new_tokens = len(encode(original_question + reply)[0]) - original_tokens print(f'Output generated in {(t1-t0):.2f} seconds ({new_tokens/(t1-t0):.2f} tokens/s, {new_tokens} tokens, context {original_tokens}, seed {seed})') return