""" Code derived from: https://github.com/acheong08/EdgeGPT/blob/f940cecd24a4818015a8b42a2443dd97c3c2a8f4/src/ImageGen.py """ from log import getlogger from typing import Union from uuid import uuid4 import os import contextlib import aiohttp import asyncio import random import time import requests import regex logger = getlogger() BING_URL = "https://www.bing.com" # Generate random IP between range 13.104.0.0/14 FORWARDED_IP = ( f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}" ) HEADERS = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "accept-language": "en-US,en;q=0.9", "cache-control": "max-age=0", "content-type": "application/x-www-form-urlencoded", "referrer": "https://www.bing.com/images/create/", "origin": "https://www.bing.com", "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.63", "x-forwarded-for": FORWARDED_IP, } # Error messages error_timeout = "Your request has timed out." error_redirect = "Redirect failed" error_blocked_prompt = ( "Your prompt has been blocked by Bing. Try to change any bad words and try again." ) error_noresults = "Could not get results" error_unsupported_lang = "\nthis language is currently not supported by bing" error_bad_images = "Bad images" error_no_images = "No images" # sending_message = "Sending request..." wait_message = "Waiting for results..." download_message = "\nDownloading images..." def debug(debug_file, text_var): """helper function for debug""" with open(f"{debug_file}", "a") as f: f.write(str(text_var)) class ImageGen: """ Image generation by Microsoft Bing Parameters:3 auth_cookie: str """ def __init__( self, auth_cookie: str, debug_file: Union[str, None] = None, quiet: bool = False ) -> None: self.session: requests.Session = requests.Session() self.session.headers = HEADERS self.session.cookies.set("_U", auth_cookie) self.quiet = quiet self.debug_file = debug_file if self.debug_file: self.debug = partial(debug, self.debug_file) def get_images(self, prompt: str) -> list: """ Fetches image links from Bing Parameters: prompt: str """ if not self.quiet: print(sending_message) if self.debug_file: self.debug(sending_message) url_encoded_prompt = requests.utils.quote(prompt) # https://www.bing.com/images/create?q=&rt=3&FORM=GENCRE url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" response = self.session.post(url, allow_redirects=False) # check for content waring message if "this prompt has been blocked" in response.text.lower(): if self.debug_file: self.debug(f"ERROR: {error_blocked_prompt}") raise Exception( error_blocked_prompt, ) if ( "we're working hard to offer image creator in more languages" in response.text.lower() ): if self.debug_file: self.debug(f"ERROR: {error_unsupported_lang}") raise Exception(error_unsupported_lang) if response.status_code != 302: # if rt4 fails, try rt3 url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" response3 = self.session.post(url, allow_redirects=False, timeout=200) if response3.status_code != 302: if self.debug_file: self.debug(f"ERROR: {error_redirect}") print(f"ERROR: {response3.text}") raise Exception(error_redirect) response = response3 # Get redirect URL redirect_url = response.headers["Location"].replace("&nfy=1", "") request_id = redirect_url.split("id=")[-1] self.session.get(f"{BING_URL}{redirect_url}") # https://www.bing.com/images/create/async/results/{ID}?q={PROMPT} polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" # Poll for results if self.debug_file: self.debug("Polling and waiting for result") if not self.quiet: print("Waiting for results...") start_wait = time.time() while True: if int(time.time() - start_wait) > 200: if self.debug_file: self.debug(f"ERROR: {error_timeout}") raise Exception(error_timeout) if not self.quiet: print(".", end="", flush=True) response = self.session.get(polling_url) if response.status_code != 200: if self.debug_file: self.debug(f"ERROR: {error_noresults}") raise Exception(error_noresults) if not response.text or response.text.find("errorMessage") != -1: time.sleep(1) continue else: break # Use regex to search for src="" image_links = regex.findall(r'src="([^"]+)"', response.text) # Remove size limit normal_image_links = [link.split("?w=")[0] for link in image_links] # Remove duplicates normal_image_links = list(set(normal_image_links)) # bad_images = [ # "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", # "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", # ] # for img in normal_image_links: # if img in bad_images: # raise Exception("Bad images") # No images if not normal_image_links: raise Exception(error_no_images) return normal_image_links def save_images(self, links: list, output_dir: str) -> str: """ Saves images to output directory """ # image name image_name = str(uuid4()) # since matrix only support one media attachment per message, we just need one link if links: link = links.pop() image_path = os.path.join(output_dir, f"{image_name}.jpeg") with contextlib.suppress(FileExistsError): os.mkdir(output_dir) try: with self.session.get(link, stream=True) as response: # save response to file response.raise_for_status() with open( os.path.join(output_dir, image_path), "wb" ) as output_file: for chunk in response.iter_content(chunk_size=8192): output_file.write(chunk) return image_path except requests.exceptions.MissingSchema as url_exception: raise Exception( "Inappropriate contents found in the generated images. Please try again or try another prompt.", ) from url_exception class ImageGenAsync: """ Image generation by Microsoft Bing Parameters: auth_cookie: str """ def __init__(self, auth_cookie: str, quiet: bool = True) -> None: self.session = aiohttp.ClientSession( headers=HEADERS, cookies={"_U": auth_cookie}, ) self.quiet = quiet async def __aenter__(self): return self async def __aexit__(self, *excinfo) -> None: await self.session.close() async def get_images(self, prompt: str) -> list: """ Fetches image links from Bing Parameters: prompt: str """ if not self.quiet: print("Sending request...") url_encoded_prompt = requests.utils.quote(prompt) # https://www.bing.com/images/create?q=&rt=3&FORM=GENCRE url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" async with self.session.post(url, allow_redirects=False) as response: content = await response.text() if "this prompt has been blocked" in content.lower(): raise Exception( "Your prompt has been blocked by Bing. Try to change any bad words and try again.", ) if response.status != 302: # if rt4 fails, try rt3 url = ( f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" ) async with self.session.post( url, allow_redirects=False, timeout=200, ) as response3: if response3.status != 302: print(f"ERROR: {response3.text}") raise Exception("Redirect failed") response = response3 # Get redirect URL redirect_url = response.headers["Location"].replace("&nfy=1", "") request_id = redirect_url.split("id=")[-1] await self.session.get(f"{BING_URL}{redirect_url}") # https://www.bing.com/images/create/async/results/{ID}?q={PROMPT} polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" # Poll for results if not self.quiet: print("Waiting for results...") while True: if not self.quiet: print(".", end="", flush=True) # By default, timeout is 300s, change as needed response = await self.session.get(polling_url) if response.status != 200: raise Exception("Could not get results") content = await response.text() if content and content.find("errorMessage") == -1: break await asyncio.sleep(1) continue # Use regex to search for src="" image_links = regex.findall(r'src="([^"]+)"', content) # Remove size limit normal_image_links = [link.split("?w=")[0] for link in image_links] # Remove duplicates normal_image_links = list(set(normal_image_links)) # Bad images bad_images = [ "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", ] for im in normal_image_links: if im in bad_images: raise Exception("Bad images") # No images if not normal_image_links: raise Exception("No images") return normal_image_links async def save_images(self, links: list, output_dir: str) -> str: """ Saves images to output directory """ if not self.quiet: print("\nDownloading images...") with contextlib.suppress(FileExistsError): os.mkdir(output_dir) # image name image_name = str(uuid4()) # since matrix only support one media attachment per message, we just need one link if links: link = links.pop() image_path = os.path.join(output_dir, f"{image_name}.jpeg") try: async with self.session.get(link, raise_for_status=True) as response: # save response to file with open(image_path, "wb") as output_file: async for chunk in response.content.iter_chunked(8192): output_file.write(chunk) return f"{output_dir}/{image_name}.jpeg" except aiohttp.client_exceptions.InvalidURL as url_exception: raise Exception( "Inappropriate contents found in the generated images. Please try again or try another prompt.", ) from url_exception