- | Merge pull request #873

- | Move httpx requirement, Fix Ails and ChatgptAi Provider, Improve scripts
This commit is contained in:
Tekky 2023-09-07 18:45:04 +01:00 committed by GitHub
commit 7ca1a59d95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 501 additions and 574 deletions

View File

@ -1,43 +1,62 @@
from __future__ import annotations from __future__ import annotations
import json import json
from aiohttp import ClientSession, http
import requests from ..typing import AsyncGenerator
from .base_provider import AsyncGeneratorProvider, format_prompt
from ..typing import Any, CreateResult
from .base_provider import BaseProvider
class AItianhu(BaseProvider): class AItianhu(AsyncGeneratorProvider):
url = "https://www.aitianhu.com/" url = "https://www.aitianhu.com"
working = False working = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
@staticmethod @classmethod
def create_completion( async def create_async_generator(
cls,
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: proxy: str = None,
**kwargs
base = "\n".join(f"{message['role']}: {message['content']}" for message in messages) ) -> AsyncGenerator:
base += "\nassistant: "
headers = { headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/116.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de,en-US;q=0.7,en;q=0.3",
"Content-Type": "application/json",
"Origin": cls.url,
"Connection": "keep-alive",
"Referer": cls.url + "/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
} }
data: dict[str, Any] = { async with ClientSession(
"prompt": base, headers=headers,
"options": {}, version=http.HttpVersion10
"systemMessage": "You are ChatGPT, a large language model trained by OpenAI. Follow the user's instructions carefully. Respond using markdown.", ) as session:
"temperature": kwargs.get("temperature", 0.8), data = {
"top_p": kwargs.get("top_p", 1), "prompt": format_prompt(messages),
} "options": {},
url = "https://www.aitianhu.com/api/chat-process" "systemMessage": "You are ChatGPT, a large language model trained by OpenAI. Follow the user's instructions carefully.",
response = requests.post(url, headers=headers, json=data) "temperature": 0.8,
response.raise_for_status() "top_p": 1,
lines = response.text.strip().split("\n") **kwargs
res = json.loads(lines[-1]) }
yield res["text"] async with session.post(
cls.url + "/api/chat-process",
proxy=proxy,
json=data,
ssl=False,
) as response:
response.raise_for_status()
async for line in response.content:
line = json.loads(line.decode('utf-8'))
token = line["detail"]["choices"][0]["delta"].get("content")
if token:
yield token
@classmethod @classmethod
@property @property
@ -46,6 +65,7 @@ class AItianhu(BaseProvider):
("model", "str"), ("model", "str"),
("messages", "list[dict[str, str]]"), ("messages", "list[dict[str, str]]"),
("stream", "bool"), ("stream", "bool"),
("proxy", "str"),
("temperature", "float"), ("temperature", "float"),
("top_p", "int"), ("top_p", "int"),
] ]

View File

@ -1,32 +1,37 @@
from __future__ import annotations from __future__ import annotations
import time from aiohttp import ClientSession
import requests from ..typing import AsyncGenerator
from .base_provider import AsyncGeneratorProvider
from ..typing import Any, CreateResult
from .base_provider import BaseProvider
class Acytoo(BaseProvider): class Acytoo(AsyncGeneratorProvider):
url = 'https://chat.acytoo.com/' url = 'https://chat.acytoo.com'
working = True working = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
@classmethod @classmethod
def create_completion( async def create_async_generator(
cls, cls,
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: proxy: str = None,
**kwargs
) -> AsyncGenerator:
response = requests.post(f'{cls.url}api/completions', async with ClientSession(
headers=_create_header(), json=_create_payload(messages, kwargs.get('temperature', 0.5))) headers=_create_header()
) as session:
response.raise_for_status() async with session.post(
response.encoding = 'utf-8' cls.url + '/api/completions',
proxy=proxy,
yield response.text json=_create_payload(messages, **kwargs)
) as response:
response.raise_for_status()
async for stream in response.content.iter_any():
if stream:
yield stream.decode()
def _create_header(): def _create_header():
@ -36,15 +41,11 @@ def _create_header():
} }
def _create_payload(messages: list[dict[str, str]], temperature): def _create_payload(messages: list[dict[str, str]], temperature: float = 0.5, **kwargs):
payload_messages = [
message | {'createdAt': int(time.time()) * 1000} for message in messages
]
return { return {
'key' : '', 'key' : '',
'model' : 'gpt-3.5-turbo', 'model' : 'gpt-3.5-turbo',
'messages' : payload_messages, 'messages' : messages,
'temperature' : temperature, 'temperature' : temperature,
'password' : '' 'password' : ''
} }

View File

@ -1,25 +1,22 @@
from __future__ import annotations from __future__ import annotations
import requests from aiohttp import ClientSession
from ..typing import Any, CreateResult from .base_provider import AsyncProvider, format_prompt
from .base_provider import BaseProvider
class Aichat(BaseProvider): class Aichat(AsyncProvider):
url = "https://chat-gpt.org/chat" url = "https://chat-gpt.org/chat"
working = True working = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
@staticmethod @staticmethod
def create_completion( async def create_async(
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: proxy: str = None,
**kwargs
chat = "\n".join(f"{message['role']}: {message['content']}" for message in messages) ) -> str:
chat += "\nassistant: "
headers = { headers = {
"authority": "chat-gpt.org", "authority": "chat-gpt.org",
"accept": "*/*", "accept": "*/*",
@ -35,21 +32,23 @@ class Aichat(BaseProvider):
"sec-fetch-site": "same-origin", "sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
} }
async with ClientSession(
json_data = { headers=headers
"message": base, ) as session:
"temperature": kwargs.get('temperature', 0.5), json_data = {
"presence_penalty": 0, "message": format_prompt(messages),
"top_p": kwargs.get('top_p', 1), "temperature": kwargs.get('temperature', 0.5),
"frequency_penalty": 0, "presence_penalty": 0,
} "top_p": kwargs.get('top_p', 1),
"frequency_penalty": 0,
response = requests.post( }
"https://chat-gpt.org/api/text", async with session.post(
headers=headers, "https://chat-gpt.org/api/text",
json=json_data, proxy=proxy,
) json=json_data
response.raise_for_status() ) as response:
if not response.json()['response']: response.raise_for_status()
raise Exception("Error Response: " + response.json()) result = await response.json()
yield response.json()["message"] if not result['response']:
raise Exception(f"Error Response: {result}")
return result["message"]

View File

@ -1,36 +1,36 @@
from __future__ import annotations from __future__ import annotations
import hashlib import hashlib
import json
import time import time
import uuid import uuid
import json
from datetime import datetime from datetime import datetime
from aiohttp import ClientSession
import requests from ..typing import SHA256, AsyncGenerator
from .base_provider import AsyncGeneratorProvider
from ..typing import SHA256, Any, CreateResult
from .base_provider import BaseProvider
class Ails(BaseProvider): class Ails(AsyncGeneratorProvider):
url: str = "https://ai.ls" url: str = "https://ai.ls"
working = True working = True
supports_stream = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
@staticmethod @staticmethod
def create_completion( async def create_async_generator(
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: stream: bool,
proxy: str = None,
**kwargs
) -> AsyncGenerator:
headers = { headers = {
"authority": "api.caipacity.com", "authority": "api.caipacity.com",
"accept": "*/*", "accept": "*/*",
"accept-language": "en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3", "accept-language": "en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3",
"authorization": "Bearer free", "authorization": "Bearer free",
"client-id": str(uuid.uuid4()), "client-id": str(uuid.uuid4()),
"client-v": _get_client_v(), "client-v": "0.1.278",
"content-type": "application/json", "content-type": "application/json",
"origin": "https://ai.ls", "origin": "https://ai.ls",
"referer": "https://ai.ls/", "referer": "https://ai.ls/",
@ -41,42 +41,39 @@ class Ails(BaseProvider):
"sec-fetch-mode": "cors", "sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site", "sec-fetch-site": "cross-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"from-url": "https://ai.ls/?chat=1"
} }
async with ClientSession(
timestamp = _format_timestamp(int(time.time() * 1000)) headers=headers
sig = { ) as session:
"d": datetime.now().strftime("%Y-%m-%d"), timestamp = _format_timestamp(int(time.time() * 1000))
"t": timestamp, json_data = {
"s": _hash({"t": timestamp, "m": messages[-1]["content"]}),
}
json_data = json.dumps(
separators=(",", ":"),
obj={
"model": "gpt-3.5-turbo", "model": "gpt-3.5-turbo",
"temperature": kwargs.get("temperature", 0.6), "temperature": kwargs.get("temperature", 0.6),
"stream": True, "stream": True,
"messages": messages, "messages": messages,
"d": datetime.now().strftime("%Y-%m-%d"),
"t": timestamp,
"s": _hash({"t": timestamp, "m": messages[-1]["content"]}),
} }
| sig, async with session.post(
) "https://api.caipacity.com/v1/chat/completions",
proxy=proxy,
json=json_data
) as response:
response.raise_for_status()
start = "data: "
async for line in response.content:
line = line.decode('utf-8')
if line.startswith(start) and line != "data: [DONE]":
line = line[len(start):-1]
line = json.loads(line)
token = line["choices"][0]["delta"].get("content")
if token:
if "ai.ls" in token or "ai.ci" in token:
raise Exception("Response Error: " + token)
yield token
response = requests.post(
"https://api.caipacity.com/v1/chat/completions",
headers=headers,
data=json_data,
stream=True,
)
response.raise_for_status()
for token in response.iter_lines():
if b"content" in token:
completion_chunk = json.loads(token.decode().replace("data: ", ""))
token = completion_chunk["choices"][0]["delta"].get("content")
if "ai.ls" in token.lower() or "ai.ci" in token.lower():
raise Exception("Response Error: " + token)
if token != None:
yield token
@classmethod @classmethod
@property @property
@ -106,14 +103,4 @@ def _format_timestamp(timestamp: int) -> str:
e = timestamp e = timestamp
n = e % 10 n = e % 10
r = n + 1 if n % 2 == 0 else n r = n + 1 if n % 2 == 0 else n
return str(e - n + r) return str(e - n + r)
def _get_client_v():
response = requests.get("https://ai.ls/?chat=1")
response.raise_for_status()
js_path = response.text.split('crossorigin href="')[1].split('"')[0]
response = requests.get("https://ai.ls" + js_path)
response.raise_for_status()
return response.text.split('G4="')[1].split('"')[0]

View File

@ -1,23 +1,17 @@
from __future__ import annotations from __future__ import annotations
import asyncio import random
import json import json
import os import os
import random from aiohttp import ClientSession, ClientTimeout
from ..typing import AsyncGenerator
import aiohttp
from aiohttp import ClientSession
from ..typing import Any, AsyncGenerator, CreateResult, Union
from .base_provider import AsyncGeneratorProvider, get_cookies from .base_provider import AsyncGeneratorProvider, get_cookies
class Bing(AsyncGeneratorProvider): class Bing(AsyncGeneratorProvider):
url = "https://bing.com/chat" url = "https://bing.com/chat"
needs_auth = True
working = True working = True
supports_gpt_4 = True supports_gpt_4 = True
supports_stream = True
@staticmethod @staticmethod
def create_async_generator( def create_async_generator(
@ -34,18 +28,16 @@ class Bing(AsyncGeneratorProvider):
prompt = messages[-1]["content"] prompt = messages[-1]["content"]
context = create_context(messages[:-1]) context = create_context(messages[:-1])
if cookies and "SRCHD" in cookies: if not cookies or "SRCHD" not in cookies:
#TODO: Will implement proper cookie retrieval later and use a try-except mechanism in 'stream_generate' instead of defaulting the cookie value like this cookies = {
cookies_dict = { 'SRCHD' : 'AF=NOFORM',
'SRCHD' : cookies["SRCHD"],
'PPLState' : '1', 'PPLState' : '1',
'KievRPSSecAuth': '', 'KievRPSSecAuth': '',
'SUID' : '', 'SUID' : '',
'SRCHUSR' : '', 'SRCHUSR' : '',
'SRCHHPGUSR' : '', 'SRCHHPGUSR' : '',
} }
return stream_generate(prompt, context, cookies)
return stream_generate(prompt, context, cookies_dict)
def create_context(messages: list[dict[str, str]]): def create_context(messages: list[dict[str, str]]):
context = "".join(f"[{message['role']}](#message)\n{message['content']}\n\n" for message in messages) context = "".join(f"[{message['role']}](#message)\n{message['content']}\n\n" for message in messages)
@ -236,7 +228,7 @@ async def stream_generate(
cookies: dict=None cookies: dict=None
): ):
async with ClientSession( async with ClientSession(
timeout=aiohttp.ClientTimeout(total=900), timeout=ClientTimeout(total=900),
cookies=cookies, cookies=cookies,
headers=Defaults.headers, headers=Defaults.headers,
) as session: ) as session:
@ -288,16 +280,4 @@ async def stream_generate(
final = True final = True
break break
finally: finally:
await delete_conversation(session, conversation) await delete_conversation(session, conversation)
def run(generator: AsyncGenerator[Union[Any, str], Any]):
loop = asyncio.get_event_loop()
gen = generator.__aiter__()
while True:
try:
yield loop.run_until_complete(gen.__anext__())
except StopAsyncIteration:
break

View File

@ -1,32 +1,28 @@
from __future__ import annotations from __future__ import annotations
import re import re
import html
import json
from aiohttp import ClientSession
import requests from ..typing import AsyncGenerator
from .base_provider import AsyncGeneratorProvider
from ..typing import Any, CreateResult
from .base_provider import BaseProvider
class ChatgptAi(BaseProvider): class ChatgptAi(AsyncGeneratorProvider):
url: str = "https://chatgpt.ai/gpt-4/" url: str = "https://chatgpt.ai/"
working = True working = True
supports_gpt_4 = True supports_gpt_35_turbo = True
_system_data = None
@staticmethod @classmethod
def create_completion( async def create_async_generator(
cls,
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: proxy: str = None,
**kwargs
chat = "\n".join(f"{message['role']}: {message['content']}" for message in messages) ) -> AsyncGenerator:
chat += "\nassistant: "
response = requests.get("https://chatgpt.ai/")
nonce, post_id, _, bot_id = re.findall(
r'data-nonce="(.*)"\n data-post-id="(.*)"\n data-url="(.*)"\n data-bot-id="(.*)"\n data-width',
response.text)[0]
headers = { headers = {
"authority" : "chatgpt.ai", "authority" : "chatgpt.ai",
"accept" : "*/*", "accept" : "*/*",
@ -34,7 +30,7 @@ class ChatgptAi(BaseProvider):
"cache-control" : "no-cache", "cache-control" : "no-cache",
"origin" : "https://chatgpt.ai", "origin" : "https://chatgpt.ai",
"pragma" : "no-cache", "pragma" : "no-cache",
"referer" : "https://chatgpt.ai/gpt-4/", "referer" : cls.url,
"sec-ch-ua" : '"Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"', "sec-ch-ua" : '"Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"',
"sec-ch-ua-mobile" : "?0", "sec-ch-ua-mobile" : "?0",
"sec-ch-ua-platform" : '"Windows"', "sec-ch-ua-platform" : '"Windows"',
@ -43,17 +39,37 @@ class ChatgptAi(BaseProvider):
"sec-fetch-site" : "same-origin", "sec-fetch-site" : "same-origin",
"user-agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "user-agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
} }
data = { async with ClientSession(
"_wpnonce" : nonce, headers=headers
"post_id" : post_id, ) as session:
"url" : "https://chatgpt.ai/gpt-4", if not cls._system_data:
"action" : "wpaicg_chat_shortcode_message", async with session.get(cls.url, proxy=proxy) as response:
"message" : chat, response.raise_for_status()
"bot_id" : bot_id, match = re.findall(r"data-system='([^']+)'", await response.text())
} if not match:
raise RuntimeError("No system data")
cls._system_data = json.loads(html.unescape(match[0]))
response = requests.post( data = {
"https://chatgpt.ai/wp-admin/admin-ajax.php", headers=headers, data=data) "botId": cls._system_data["botId"],
"clientId": "",
response.raise_for_status() "contextId": cls._system_data["contextId"],
yield response.json()["data"] "id": cls._system_data["id"],
"messages": messages[:-1],
"newMessage": messages[-1]["content"],
"session": cls._system_data["sessionId"],
"stream": True
}
async with session.post(
"https://chatgpt.ai/wp-json/mwai-ui/v1/chats/submit",
proxy=proxy,
json=data
) as response:
response.raise_for_status()
start = "data: "
async for line in response.content:
line = line.decode('utf-8')
if line.startswith(start):
line = json.loads(line[len(start):-1])
if line["type"] == "live":
yield line["data"]

View File

@ -1,70 +1,58 @@
from __future__ import annotations from __future__ import annotations
import base64 import os, re
import os from aiohttp import ClientSession
import re
import requests from .base_provider import AsyncProvider, format_prompt
from ..typing import Any, CreateResult
from .base_provider import BaseProvider
class ChatgptLogin(BaseProvider): class ChatgptLogin(AsyncProvider):
url = "https://opchatgpts.net" url = "https://opchatgpts.net"
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
working = True working = True
_nonce = None
@staticmethod @classmethod
def create_completion( async def create_async(
cls,
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: **kwargs
) -> str:
headers = { headers = {
"authority" : "chatgptlogin.ac", "User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
"accept" : "*/*", "Accept" : "*/*",
"accept-language" : "en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3", "Accept-language" : "en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3",
"content-type" : "application/json", "Origin" : "https://opchatgpts.net",
"origin" : "https://opchatgpts.net", "Alt-Used" : "opchatgpts.net",
"referer" : "https://opchatgpts.net/chatgpt-free-use/", "Referer" : "https://opchatgpts.net/chatgpt-free-use/",
"sec-ch-ua" : '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"', "Sec-Fetch-Dest" : "empty",
"sec-ch-ua-mobile" : "?0", "Sec-Fetch-Mode" : "cors",
"sec-ch-ua-platform" : '"Windows"', "Sec-Fetch-Site" : "same-origin",
"sec-fetch-dest" : "empty",
"sec-fetch-mode" : "cors",
"sec-fetch-site" : "same-origin",
"user-agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
"x-wp-nonce" : _get_nonce(),
} }
async with ClientSession(
conversation = _transform(messages) headers=headers
) as session:
json_data = { if not cls._nonce:
"env" : "chatbot", async with session.get(
"session" : "N/A", "https://opchatgpts.net/chatgpt-free-use/",
"prompt" : "Converse as if you were an AI assistant. Be friendly, creative.", params={"id": os.urandom(6).hex()},
"context" : "Converse as if you were an AI assistant. Be friendly, creative.", ) as response:
"messages" : conversation, result = re.search(r'data-nonce="(.*?)"', await response.text())
"newMessage" : messages[-1]["content"], if not result:
"userName" : '<div class="mwai-name-text">User:</div>', raise RuntimeError("No nonce value")
"aiName" : '<div class="mwai-name-text">AI:</div>', cls._nonce = result.group(1)
"model" : "gpt-3.5-turbo", data = {
"temperature" : kwargs.get("temperature", 0.8), "_wpnonce": cls._nonce,
"maxTokens" : 1024, "post_id": 28,
"maxResults" : 1, "url": "https://opchatgpts.net/chatgpt-free-use",
"apiKey" : "", "action": "wpaicg_chat_shortcode_message",
"service" : "openai", "message": format_prompt(messages),
"embeddingsIndex": "", "bot_id": 0
"stop" : "", }
"clientId" : os.urandom(6).hex() async with session.post("https://opchatgpts.net/wp-admin/admin-ajax.php", data=data) as response:
} response.raise_for_status()
return (await response.json())["data"]
response = requests.post("https://opchatgpts.net/wp-json/ai-chatbot/v1/chat",
headers=headers, json=json_data)
response.raise_for_status()
yield response.json()["reply"]
@classmethod @classmethod
@property @property
@ -76,55 +64,4 @@ class ChatgptLogin(BaseProvider):
("temperature", "float"), ("temperature", "float"),
] ]
param = ", ".join([": ".join(p) for p in params]) param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})" return f"g4f.provider.{cls.__name__} supports: ({param})"
def _get_nonce() -> str:
res = requests.get("https://opchatgpts.net/chatgpt-free-use/",
headers = {
"Referer" : "https://opchatgpts.net/chatgpt-free-use/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"})
result = re.search(
r'class="mwai-chat mwai-chatgpt">.*<span>Send</span></button></div></div></div> <script defer src="(.*?)">',
res.text)
if result is None:
return ""
src = result.group(1)
decoded_string = base64.b64decode(src.split(",")[-1]).decode("utf-8")
result = re.search(r"let restNonce = '(.*?)';", decoded_string)
return "" if result is None else result.group(1)
def _transform(messages: list[dict[str, str]]) -> list[dict[str, Any]]:
return [
{
"id" : os.urandom(6).hex(),
"role" : message["role"],
"content": message["content"],
"who" : "AI: " if message["role"] == "assistant" else "User: ",
"html" : _html_encode(message["content"]),
}
for message in messages
]
def _html_encode(string: str) -> str:
table = {
'"' : "&quot;",
"'" : "&#39;",
"&" : "&amp;",
">" : "&gt;",
"<" : "&lt;",
"\n": "<br>",
"\t": "&nbsp;&nbsp;&nbsp;&nbsp;",
" " : "&nbsp;",
}
for key in table:
string = string.replace(key, table[key])
return string

View File

@ -1,25 +1,25 @@
from __future__ import annotations from __future__ import annotations
import json import json
import js2py import js2py
import requests from aiohttp import ClientSession
from ..typing import Any, CreateResult from ..typing import AsyncGenerator
from .base_provider import BaseProvider from .base_provider import AsyncGeneratorProvider
class DeepAi(BaseProvider): class DeepAi(AsyncGeneratorProvider):
url: str = "https://deepai.org" url: str = "https://deepai.org"
working = True working = True
supports_stream = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
@staticmethod @staticmethod
def create_completion( async def create_async_generator(
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: proxy: str = None,
**kwargs
) -> AsyncGenerator:
token_js = """ token_js = """
var agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' var agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'
@ -53,10 +53,11 @@ f = function () {
"api-key": api_key, "api-key": api_key,
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
} }
async with ClientSession(
response = requests.post("https://api.deepai.org/make_me_a_pizza", headers=headers
headers=headers, data=payload, stream=True) ) as session:
async with session.post("https://api.deepai.org/make_me_a_pizza", proxy=proxy, data=payload) as response:
for chunk in response.iter_content(chunk_size=None): response.raise_for_status()
response.raise_for_status() async for stream in response.content.iter_any():
yield chunk.decode() if stream:
yield stream.decode()

View File

@ -12,8 +12,7 @@ from .base_provider import AsyncGeneratorProvider, format_prompt
class H2o(AsyncGeneratorProvider): class H2o(AsyncGeneratorProvider):
url = "https://gpt-gm.h2o.ai" url = "https://gpt-gm.h2o.ai"
working = True working = True
supports_stream = True model = "h2oai/h2ogpt-gm-oasst1-en-2048-falcon-40b-v1"
model = "h2oai/h2ogpt-gm-oasst1-en-2048-falcon-40b-v1"
@classmethod @classmethod
async def create_async_generator( async def create_async_generator(

View File

@ -24,9 +24,9 @@ class HuggingChat(AsyncGeneratorProvider):
cookies: dict = None, cookies: dict = None,
**kwargs **kwargs
) -> AsyncGenerator: ) -> AsyncGenerator:
model = model if model else cls.model
if not cookies: if not cookies:
cookies = get_cookies(".huggingface.co") cookies = get_cookies(".huggingface.co")
model = model if model else cls.model
if proxy and "://" not in proxy: if proxy and "://" not in proxy:
proxy = f"http://{proxy}" proxy = f"http://{proxy}"
@ -62,36 +62,32 @@ class HuggingChat(AsyncGeneratorProvider):
"web_search_id": "" "web_search_id": ""
} }
} }
start = "data:"
first = True
async with session.post(f"https://huggingface.co/chat/conversation/{conversation_id}", proxy=proxy, json=send) as response: async with session.post(f"https://huggingface.co/chat/conversation/{conversation_id}", proxy=proxy, json=send) as response:
async for line in response.content: if not stream:
line = line.decode("utf-8") data = await response.json()
if not line: if "error" in data:
continue raise RuntimeError(data["error"])
if not stream: elif isinstance(data, list):
try: yield data[0]["generated_text"]
data = json.loads(line) else:
except json.decoder.JSONDecodeError: raise RuntimeError(f"Response: {data}")
raise RuntimeError(f"No json: {line}") else:
if "error" in data: start = "data:"
raise RuntimeError(data["error"]) first = True
elif isinstance(data, list): async for line in response.content:
yield data[0]["generated_text"] line = line.decode("utf-8")
else:
raise RuntimeError(f"Response: {line}")
elif line.startswith(start):
line = json.loads(line[len(start):-1])
if not line: if not line:
continue continue
if "token" not in line: if line.startswith(start):
raise RuntimeError(f"Response: {line}") line = json.loads(line[len(start):-1])
if not line["token"]["special"]: if "token" not in line:
if first: raise RuntimeError(f"Response: {line}")
yield line["token"]["text"].lstrip() if not line["token"]["special"]:
first = False if first:
else: yield line["token"]["text"].lstrip()
yield line["token"]["text"] first = False
else:
yield line["token"]["text"]
async with session.delete(f"https://huggingface.co/chat/conversation/{conversation_id}", proxy=proxy) as response: async with session.delete(f"https://huggingface.co/chat/conversation/{conversation_id}", proxy=proxy) as response:
response.raise_for_status() response.raise_for_status()

View File

@ -32,7 +32,6 @@ models = {
class Liaobots(AsyncGeneratorProvider): class Liaobots(AsyncGeneratorProvider):
url = "https://liaobots.com" url = "https://liaobots.com"
working = True working = True
supports_stream = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
supports_gpt_4 = True supports_gpt_4 = True
_auth_code = None _auth_code = None
@ -46,24 +45,24 @@ class Liaobots(AsyncGeneratorProvider):
proxy: str = None, proxy: str = None,
**kwargs **kwargs
) -> AsyncGenerator: ) -> AsyncGenerator:
model = model if model in models else "gpt-3.5-turbo"
if proxy and "://" not in proxy: if proxy and "://" not in proxy:
proxy = f"http://{proxy}" proxy = f"http://{proxy}"
headers = { headers = {
"authority": "liaobots.com", "authority": "liaobots.com",
"content-type": "application/json", "content-type": "application/json",
"origin": "https://liaobots.com", "origin": cls.url,
"referer": "https://liaobots.com/", "referer": cls.url + "/",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
} }
async with ClientSession( async with ClientSession(
headers=headers headers=headers
) as session: ) as session:
model = model if model in models else "gpt-3.5-turbo"
auth_code = auth if isinstance(auth, str) else cls._auth_code auth_code = auth if isinstance(auth, str) else cls._auth_code
if not auth_code: if not auth_code:
async with session.post("https://liaobots.com/api/user", proxy=proxy, json={"authcode": ""}) as response: async with session.post(cls.url + "/api/user", proxy=proxy, json={"authcode": ""}) as response:
response.raise_for_status() response.raise_for_status()
auth_code = cls._auth_code = json.loads((await response.text()))["authCode"] auth_code = cls._auth_code = json.loads(await response.text())["authCode"]
data = { data = {
"conversationId": str(uuid.uuid4()), "conversationId": str(uuid.uuid4()),
"model": models[model], "model": models[model],
@ -71,10 +70,11 @@ class Liaobots(AsyncGeneratorProvider):
"key": "", "key": "",
"prompt": "You are ChatGPT, a large language model trained by OpenAI. Follow the user's instructions carefully.", "prompt": "You are ChatGPT, a large language model trained by OpenAI. Follow the user's instructions carefully.",
} }
async with session.post("https://liaobots.com/api/chat", proxy=proxy, json=data, headers={"x-auth-code": auth_code}) as response: async with session.post(cls.url + "/api/chat", proxy=proxy, json=data, headers={"x-auth-code": auth_code}) as response:
response.raise_for_status() response.raise_for_status()
async for line in response.content: async for stream in response.content.iter_any():
yield line.decode("utf-8") if stream:
yield stream.decode()
@classmethod @classmethod

View File

@ -1,60 +1,8 @@
from __future__ import annotations from __future__ import annotations
import requests from .ChatgptLogin import ChatgptLogin
from ..typing import Any, CreateResult
from .base_provider import BaseProvider
class Opchatgpts(BaseProvider): class Opchatgpts(ChatgptLogin):
url = "https://opchatgpts.net" url = "https://opchatgpts.net"
working = True working = True
supports_gpt_35_turbo = True
@staticmethod
def create_completion(
model: str,
messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult:
temperature = kwargs.get("temperature", 0.8)
max_tokens = kwargs.get("max_tokens", 1024)
system_prompt = kwargs.get(
"system_prompt",
"Converse as if you were an AI assistant. Be friendly, creative.")
payload = _create_payload(
messages = messages,
temperature = temperature,
max_tokens = max_tokens,
system_prompt = system_prompt)
response = requests.post("https://opchatgpts.net/wp-json/ai-chatbot/v1/chat", json=payload)
response.raise_for_status()
yield response.json()["reply"]
def _create_payload(
messages: list[dict[str, str]],
temperature: float,
max_tokens: int, system_prompt: str) -> dict:
return {
"env" : "chatbot",
"session" : "N/A",
"prompt" : "\n",
"context" : system_prompt,
"messages" : messages,
"newMessage" : messages[::-1][0]["content"],
"userName" : '<div class="mwai-name-text">User:</div>',
"aiName" : '<div class="mwai-name-text">AI:</div>',
"model" : "gpt-3.5-turbo",
"temperature" : temperature,
"maxTokens" : max_tokens,
"maxResults" : 1,
"apiKey" : "",
"service" : "openai",
"embeddingsIndex" : "",
"stop" : "",
}

View File

@ -1,67 +1,82 @@
from __future__ import annotations from __future__ import annotations
has_module = True from curl_cffi.requests import AsyncSession
try: import uuid
from revChatGPT.V1 import AsyncChatbot
except ImportError:
has_module = False
import json import json
from httpx import AsyncClient from .base_provider import AsyncProvider, get_cookies, format_prompt
from ..typing import AsyncGenerator from ..typing import AsyncGenerator
from .base_provider import AsyncGeneratorProvider, format_prompt, get_cookies
class OpenaiChat(AsyncGeneratorProvider): class OpenaiChat(AsyncProvider):
url = "https://chat.openai.com" url = "https://chat.openai.com"
needs_auth = True needs_auth = True
working = has_module working = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
supports_gpt_4 = True
supports_stream = True
_access_token = None _access_token = None
@classmethod @classmethod
async def create_async_generator( async def create_async(
cls, cls,
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
proxy: str = None, proxy: str = None,
access_token: str = _access_token, access_token: str = None,
cookies: dict = None, cookies: dict = None,
**kwargs: dict **kwargs: dict
) -> AsyncGenerator: ) -> AsyncGenerator:
proxies = None
config = {"access_token": access_token, "model": model}
if proxy: if proxy:
if "://" not in proxy: if "://" not in proxy:
proxy = f"http://{proxy}" proxy = f"http://{proxy}"
config["proxy"] = proxy proxies = {
"http": proxy,
bot = AsyncChatbot( "https": proxy
config=config }
)
if not access_token: if not access_token:
cookies = cookies if cookies else get_cookies("chat.openai.com") access_token = await cls.get_access_token(cookies)
cls._access_token = await get_access_token(bot.session, cookies) headers = {
bot.set_access_token(cls._access_token) "Accept": "text/event-stream",
"Authorization": f"Bearer {access_token}",
}
async with AsyncSession(proxies=proxies, headers=headers, impersonate="chrome107") as session:
messages = [
{
"id": str(uuid.uuid4()),
"author": {"role": "user"},
"content": {"content_type": "text", "parts": [format_prompt(messages)]},
},
]
data = {
"action": "next",
"messages": messages,
"conversation_id": None,
"parent_message_id": str(uuid.uuid4()),
"model": "text-davinci-002-render-sha",
"history_and_training_disabled": True,
}
response = await session.post("https://chat.openai.com/backend-api/conversation", json=data)
response.raise_for_status()
last_message = None
for line in response.content.decode().splitlines():
if line.startswith("data: "):
line = line[6:]
if line != "[DONE]":
line = json.loads(line)
if "message" in line:
last_message = line["message"]["content"]["parts"][0]
return last_message
returned = None
async for message in bot.ask(format_prompt(messages)): @classmethod
message = message["message"] async def get_access_token(cls, cookies: dict = None, proxies: dict = None):
if returned: if not cls._access_token:
if message.startswith(returned): cookies = cookies if cookies else get_cookies("chat.openai.com")
new = message[len(returned):] async with AsyncSession(proxies=proxies, cookies=cookies, impersonate="chrome107") as session:
if new: response = await session.get("https://chat.openai.com/api/auth/session")
yield new response.raise_for_status()
else: cls._access_token = response.json()["accessToken"]
yield message return cls._access_token
returned = message
await bot.delete_conversation(bot.conversation_id)
@classmethod @classmethod
@ -72,15 +87,8 @@ class OpenaiChat(AsyncGeneratorProvider):
("messages", "list[dict[str, str]]"), ("messages", "list[dict[str, str]]"),
("stream", "bool"), ("stream", "bool"),
("proxy", "str"), ("proxy", "str"),
("access_token", "str"),
("cookies", "dict[str, str]")
] ]
param = ", ".join([": ".join(p) for p in params]) param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})" return f"g4f.provider.{cls.__name__} supports: ({param})"
async def get_access_token(session: AsyncClient, cookies: dict):
response = await session.get("https://chat.openai.com/api/auth/session", cookies=cookies)
response.raise_for_status()
try:
return response.json()["accessToken"]
except json.decoder.JSONDecodeError:
raise RuntimeError(f"Response: {response.text}")

View File

@ -1,63 +1,72 @@
from __future__ import annotations from __future__ import annotations
import base64 import base64, json, uuid, quickjs, random
import json from curl_cffi.requests import AsyncSession
import uuid
import quickjs from ..typing import Any, TypedDict
from curl_cffi import requests from .base_provider import AsyncProvider
from ..typing import Any, CreateResult, TypedDict
from .base_provider import BaseProvider
class Vercel(BaseProvider): class Vercel(AsyncProvider):
url = "https://play.vercel.ai" url = "https://sdk.vercel.ai"
working = True working = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
model = "replicate:replicate/llama-2-70b-chat"
@staticmethod @classmethod
def create_completion( async def create_async(
cls,
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: proxy: str = None,
**kwargs
) -> str:
if model in ["gpt-3.5-turbo", "gpt-4"]: if model in ["gpt-3.5-turbo", "gpt-4"]:
model = "openai:" + model model = "openai:" + model
yield _chat(model_id=model, messages=messages) model = model if model else cls.model
proxies = None
if proxy:
if "://" not in proxy:
proxy = "http://" + proxy
proxies = {"http": proxy, "https": proxy}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.{rand1}.{rand2} Safari/537.36".format(
rand1=random.randint(0,9999),
rand2=random.randint(0,9999)
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.5",
"TE": "trailers",
}
async with AsyncSession(headers=headers, proxies=proxies, impersonate="chrome107") as session:
response = await session.get(cls.url + "/openai.jpeg")
response.raise_for_status()
custom_encoding = _get_custom_encoding(response.text)
headers = {
"Content-Type": "application/json",
"Custom-Encoding": custom_encoding,
}
data = _create_payload(model, messages)
response = await session.post(cls.url + "/api/generate", json=data, headers=headers)
response.raise_for_status()
return response.text
def _chat(model_id: str, messages: list[dict[str, str]]) -> str: def _create_payload(model: str, messages: list[dict[str, str]]) -> dict[str, Any]:
session = requests.Session(impersonate="chrome107") if model not in model_info:
raise RuntimeError(f'Model "{model}" are not supported')
url = "https://sdk.vercel.ai/api/generate" default_params = model_info[model]["default_params"]
header = _create_header(session)
payload = _create_payload(model_id, messages)
response = session.post(url=url, headers=header, json=payload)
response.raise_for_status()
return response.text
def _create_payload(model_id: str, messages: list[dict[str, str]]) -> dict[str, Any]:
default_params = model_info[model_id]["default_params"]
return { return {
"messages": messages, "messages": messages,
"playgroundId": str(uuid.uuid4()), "playgroundId": str(uuid.uuid4()),
"chatIndex": 0, "chatIndex": 0,
"model": model_id} | default_params "model": model
} | default_params
def _create_header(session: requests.Session):
custom_encoding = _get_custom_encoding(session)
return {"custom-encoding": custom_encoding}
# based on https://github.com/ading2210/vercel-llm-api # based on https://github.com/ading2210/vercel-llm-api
def _get_custom_encoding(session: requests.Session): def _get_custom_encoding(text: str) -> str:
url = "https://sdk.vercel.ai/openai.jpeg" data = json.loads(base64.b64decode(text, validate=True))
response = session.get(url=url)
data = json.loads(base64.b64decode(response.text, validate=True))
script = """ script = """
String.prototype.fontcolor = function() {{ String.prototype.fontcolor = function() {{
return `<font>${{this}}</font>` return `<font>${{this}}</font>`
@ -67,7 +76,6 @@ def _get_custom_encoding(session: requests.Session):
""".format( """.format(
script=data["c"], key=data["a"] script=data["c"], key=data["a"]
) )
context = quickjs.Context() # type: ignore context = quickjs.Context() # type: ignore
token_data = json.loads(context.eval(script).json()) # type: ignore token_data = json.loads(context.eval(script).json()) # type: ignore
token_data[2] = "mark" token_data[2] = "mark"
@ -136,6 +144,15 @@ model_info: dict[str, ModelInfo] = {
"repetitionPenalty": 1, "repetitionPenalty": 1,
}, },
}, },
"replicate:replicate/llama-2-70b-chat": {
"id": "replicate:replicate/llama-2-70b-chat",
"default_params": {
"temperature": 0.75,
"maxTokens": 1000,
"topP": 1,
"repetitionPenalty": 1,
},
},
"huggingface:bigscience/bloom": { "huggingface:bigscience/bloom": {
"id": "huggingface:bigscience/bloom", "id": "huggingface:bigscience/bloom",
"default_params": { "default_params": {

View File

@ -1,47 +1,36 @@
from __future__ import annotations from __future__ import annotations
import json import random, string, time
import random from aiohttp import ClientSession
import string
import time
import requests from .base_provider import AsyncProvider
from ..typing import Any, CreateResult
from .base_provider import BaseProvider
class Wewordle(BaseProvider): class Wewordle(AsyncProvider):
url = "https://wewordle.org/" url = "https://wewordle.org"
working = True working = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
@classmethod @classmethod
def create_completion( async def create_async(
cls, cls,
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
stream: bool, **kwargs: Any) -> CreateResult: proxy: str = None,
**kwargs
) -> str:
# randomize user id and app id
_user_id = "".join(
random.choices(f"{string.ascii_lowercase}{string.digits}", k=16))
_app_id = "".join(
random.choices(f"{string.ascii_lowercase}{string.digits}", k=31))
# make current date with format utc
_request_date = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
headers = { headers = {
"accept" : "*/*", "accept" : "*/*",
"pragma" : "no-cache", "pragma" : "no-cache",
"Content-Type" : "application/json", "Content-Type" : "application/json",
"Connection" : "keep-alive" "Connection" : "keep-alive"
# user agent android client
# 'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 10; SM-G975F Build/QP1A.190711.020)',
} }
data: dict[str, Any] = { _user_id = "".join(random.choices(f"{string.ascii_lowercase}{string.digits}", k=16))
_app_id = "".join(random.choices(f"{string.ascii_lowercase}{string.digits}", k=31))
_request_date = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
data = {
"user" : _user_id, "user" : _user_id,
"messages" : messages, "messages" : messages,
"subscriber": { "subscriber": {
@ -65,10 +54,12 @@ class Wewordle(BaseProvider):
} }
} }
response = requests.post(f"{cls.url}gptapi/v1/android/turbo",
headers=headers, data=json.dumps(data)) async with ClientSession(
headers=headers
response.raise_for_status() ) as session:
_json = response.json() async with session.post(f"{cls.url}/gptapi/v1/android/turbo", proxy=proxy, json=data) as response:
if "message" in _json: response.raise_for_status()
yield _json["message"]["content"] content = (await response.json())["message"]["content"]
if content:
return content

View File

@ -2,28 +2,31 @@ from __future__ import annotations
from aiohttp import ClientSession from aiohttp import ClientSession
from .base_provider import AsyncProvider, format_prompt from ..typing import AsyncGenerator
from .base_provider import AsyncGeneratorProvider, format_prompt
class Yqcloud(AsyncProvider): class Yqcloud(AsyncGeneratorProvider):
url = "https://chat9.yqcloud.top/" url = "https://chat9.yqcloud.top/"
working = True working = True
supports_gpt_35_turbo = True supports_gpt_35_turbo = True
@staticmethod @staticmethod
async def create_async( async def create_async_generator(
model: str, model: str,
messages: list[dict[str, str]], messages: list[dict[str, str]],
proxy: str = None, proxy: str = None,
**kwargs, **kwargs,
) -> str: ) -> AsyncGenerator:
async with ClientSession( async with ClientSession(
headers=_create_header() headers=_create_header()
) as session: ) as session:
payload = _create_payload(messages) payload = _create_payload(messages)
async with session.post("https://api.aichatos.cloud/api/generateStream", proxy=proxy, json=payload) as response: async with session.post("https://api.aichatos.cloud/api/generateStream", proxy=proxy, json=payload) as response:
response.raise_for_status() response.raise_for_status()
return await response.text() async for stream in response.content.iter_any():
if stream:
yield stream.decode()
def _create_header(): def _create_header():
@ -40,6 +43,6 @@ def _create_payload(messages: list[dict[str, str]]):
"network": True, "network": True,
"system": "", "system": "",
"withoutContext": False, "withoutContext": False,
"stream": False, "stream": True,
"userId": "#/chat/1693025544336" "userId": "#/chat/1693025544336"
} }

View File

@ -42,10 +42,11 @@ _cookies = {}
def get_cookies(cookie_domain: str) -> dict: def get_cookies(cookie_domain: str) -> dict:
if cookie_domain not in _cookies: if cookie_domain not in _cookies:
_cookies[cookie_domain] = {} _cookies[cookie_domain] = {}
try:
for cookie in browser_cookie3.load(cookie_domain): for cookie in browser_cookie3.load(cookie_domain):
_cookies[cookie_domain][cookie.name] = cookie.value _cookies[cookie_domain][cookie.name] = cookie.value
except:
pass
return _cookies[cookie_domain] return _cookies[cookie_domain]
@ -79,6 +80,8 @@ class AsyncProvider(BaseProvider):
class AsyncGeneratorProvider(AsyncProvider): class AsyncGeneratorProvider(AsyncProvider):
supports_stream = True
@classmethod @classmethod
def create_completion( def create_completion(
cls, cls,

View File

@ -9,4 +9,3 @@ js2py
quickjs quickjs
flask flask
flask-cors flask-cors
httpx

37
testing/test_async.py Normal file
View File

@ -0,0 +1,37 @@
import sys
from pathlib import Path
import asyncio
sys.path.append(str(Path(__file__).parent.parent))
import g4f
from g4f.Provider import AsyncProvider
from testing.test_providers import get_providers
from testing.log_time import log_time_async
async def create_async(provider: AsyncProvider):
model = g4f.models.gpt_35_turbo.name if provider.supports_gpt_35_turbo else g4f.models.default.name
try:
response = await log_time_async(
provider.create_async,
model=model,
messages=[{"role": "user", "content": "Hello Assistant!"}]
)
assert type(response) is str
assert len(response) > 0
return response
except Exception as e:
return e
async def run_async():
_providers: list[AsyncProvider] = [
_provider
for _provider in get_providers()
if _provider.working and hasattr(_provider, "create_async")
]
responses = [create_async(_provider) for _provider in _providers]
responses = await asyncio.gather(*responses)
for idx, provider in enumerate(_providers):
print(f"{provider.__name__}:", responses[idx])
print("Total:", asyncio.run(log_time_async(run_async)))

View File

@ -8,6 +8,11 @@ from g4f import BaseProvider, models, Provider
logging = False logging = False
class Styles:
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
def main(): def main():
providers = get_providers() providers = get_providers()
failed_providers = [] failed_providers = []
@ -24,39 +29,40 @@ def main():
print() print()
if failed_providers: if failed_providers:
print(f"{Fore.RED}Failed providers:\n") print(f"{Fore.RED + Styles.BOLD}Failed providers:{Styles.ENDC}")
for _provider in failed_providers: for _provider in failed_providers:
print(f"{Fore.RED}{_provider.__name__}") print(f"{Fore.RED}{_provider.__name__}")
else: else:
print(f"{Fore.GREEN}All providers are working") print(f"{Fore.GREEN + Styles.BOLD}All providers are working")
def get_providers() -> list[type[BaseProvider]]: def get_providers() -> list[type[BaseProvider]]:
provider_names = dir(Provider) provider_names = dir(Provider)
ignore_names = [ ignore_names = [
"annotations",
"base_provider", "base_provider",
"BaseProvider" "BaseProvider",
"AsyncProvider",
"AsyncGeneratorProvider"
] ]
provider_names = [ provider_names = [
provider_name provider_name
for provider_name in provider_names for provider_name in provider_names
if not provider_name.startswith("__") and provider_name not in ignore_names if not provider_name.startswith("__") and provider_name not in ignore_names
] ]
return [getattr(Provider, provider_name) for provider_name in sorted(provider_names)] return [getattr(Provider, provider_name) for provider_name in provider_names]
def create_response(_provider: type[BaseProvider]) -> str: def create_response(_provider: type[BaseProvider]) -> str:
if _provider.supports_gpt_35_turbo: if _provider.supports_gpt_35_turbo:
model = models.gpt_35_turbo.name model = models.gpt_35_turbo.name
elif _provider.supports_gpt_4: elif _provider.supports_gpt_4:
model = models.gpt_4 model = models.gpt_4.name
elif hasattr(_provider, "model"):
model = _provider.model
else: else:
model = None model = models.default.name
response = _provider.create_completion( response = _provider.create_completion(
model=model, model=model,
messages=[{"role": "user", "content": "Hello"}], messages=[{"role": "user", "content": "Hello, who are you? Answer in detail much as possible."}],
stream=False, stream=False,
) )
return "".join(response) return "".join(response)

View File

@ -5,10 +5,9 @@ from urllib.parse import urlparse
sys.path.append(str(Path(__file__).parent.parent)) sys.path.append(str(Path(__file__).parent.parent))
from g4f import models, Provider from g4f import models
from g4f.Provider.base_provider import BaseProvider, AsyncProvider from g4f.Provider.base_provider import AsyncProvider
from testing.test_providers import test from testing.test_providers import test, get_providers
def print_imports(): def print_imports():
print("##### Providers:") print("##### Providers:")
@ -68,26 +67,6 @@ def print_providers():
) )
print("\n".join(lines)) print("\n".join(lines))
def get_provider_names() -> list[str]:
provider_names = dir(Provider)
ignore_names = [
"base_provider",
"BaseProvider",
"AsyncProvider",
"AsyncGeneratorProvider"
]
return [
provider_name
for provider_name in provider_names
if not provider_name.startswith("__") and provider_name not in ignore_names
]
def get_providers() -> list[type[BaseProvider]]:
return [getattr(Provider, provider_name) for provider_name in get_provider_names()]
def print_models(): def print_models():
base_provider_names = { base_provider_names = {
"cohere": "Cohere", "cohere": "Cohere",