Update __init__.py

This commit is contained in:
ThatLukinhasGuy 2023-11-04 18:16:09 -03:00 committed by GitHub
parent 318112c8b9
commit 0af4fc0997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 123 additions and 128 deletions

View File

@ -1,167 +1,162 @@
from fastapi import FastAPI, Response, Request from fastapi import FastAPI, Response, Request
from fastapi.middleware.cors import CORSMiddleware from typing import List, Union, Any, Dict, AnyStr
from typing import List, Union, Any, Dict, AnyStr from ._tokenizer import tokenize
from ._tokenizer import tokenize from .. import BaseProvider
import g4f
import time import time
import json import json
import random import random
import string import string
import uvicorn import uvicorn
import nest_asyncio import nest_asyncio
import g4f
app = FastAPI() class Api:
nest_asyncio.apply() def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
list_ignored_providers: List[Union[str, BaseProvider]] = None) -> None:
self.engine = engine
self.debug = debug
self.sentry = sentry
self.list_ignored_providers = list_ignored_providers
origins = [ self.app = FastAPI()
"http://localhost", nest_asyncio.apply()
"http://localhost:1337",
]
app.add_middleware( JSONObject = Dict[AnyStr, Any]
CORSMiddleware, JSONArray = List[Any]
allow_origins=origins, JSONStructure = Union[JSONArray, JSONObject]
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
JSONObject = Dict[AnyStr, Any] @self.app.get("/")
JSONArray = List[Any] async def read_root():
JSONStructure = Union[JSONArray, JSONObject] return Response(content=json.dumps({"info": "g4f API"}, indent=4), media_type="application/json")
@app.get("/") @self.app.get("/v1")
async def read_root(): async def read_root_v1():
return Response(content=json.dumps({"info": "G4F API"}, indent=4), media_type="application/json") return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json")
@app.get("/v1") @self.app.get("/v1/models")
async def read_root_v1(): async def models():
return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json") model_list = [{
'id': model,
'object': 'model',
'created': 0,
'owned_by': 'g4f'} for model in g4f.Model.__all__()]
@app.get("/v1/models") return Response(content=json.dumps({
async def models(): 'object': 'list',
model_list = [{ 'data': model_list}, indent=4), media_type="application/json")
'id': model,
'object': 'model',
'created': 0,
'owned_by': 'g4f'} for model in g4f.Model.__all__()]
return Response(content=json.dumps({ @self.app.get("/v1/models/{model_name}")
'object': 'list', async def model_info(model_name: str):
'data': model_list}, indent=4), media_type="application/json") try:
model_info = (g4f.ModelUtils.convert[model_name])
@app.get("/v1/models/{model_name}") return Response(content=json.dumps({
async def model_info(model_name: str): 'id': model_name,
try: 'object': 'model',
model_info = (g4f.ModelUtils.convert[model_name]) 'created': 0,
'owned_by': model_info.base_provider
}, indent=4), media_type="application/json")
except:
return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json")
return Response(content=json.dumps({ @self.app.post("/v1/chat/completions")
'id': model_name, async def chat_completions(request: Request, item: JSONStructure = None):
'object': 'model', item_data = {
'created': 0, 'model': 'gpt-3.5-turbo',
'owned_by': model_info.base_provider 'stream': False,
}, indent=4), media_type="application/json") }
except:
return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json")
@app.post("/v1/chat/completions") item_data.update(item or {})
async def chat_completions(request: Request, item: JSONStructure = None): model = item_data.get('model')
stream = item_data.get('stream')
messages = item_data.get('messages')
item_data = { try:
'model': 'gpt-3.5-turbo', response = g4f.ChatCompletion.create(model=model, stream=stream, messages=messages)
'stream': False, except:
} return Response(content=json.dumps({"error": "An error occurred while generating the response."}, indent=4), media_type="application/json")
item_data.update(item or {})
model = item_data.get('model')
stream = item_data.get('stream')
messages = item_data.get('messages')
try: completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
response = g4f.ChatCompletion.create(model=model, stream=stream, messages=messages) completion_timestamp = int(time.time())
except:
return Response(content=json.dumps({"error": "An error occurred while generating the response."}, indent=4), media_type="application/json")
completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28)) if not stream:
completion_timestamp = int(time.time()) prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
completion_tokens, _ = tokenize(response)
if not stream: json_data = {
prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
completion_tokens, _ = tokenize(response)
json_data = {
'id': f'chatcmpl-{completion_id}',
'object': 'chat.completion',
'created': completion_timestamp,
'model': model,
'choices': [
{
'index': 0,
'message': {
'role': 'assistant',
'content': response,
},
'finish_reason': 'stop',
}
],
'usage': {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': prompt_tokens + completion_tokens,
},
}
return Response(content=json.dumps(json_data, indent=4), media_type="application/json")
def streaming():
try:
for chunk in response:
completion_data = {
'id': f'chatcmpl-{completion_id}', 'id': f'chatcmpl-{completion_id}',
'object': 'chat.completion.chunk', 'object': 'chat.completion',
'created': completion_timestamp, 'created': completion_timestamp,
'model': model, 'model': model,
'choices': [ 'choices': [
{ {
'index': 0, 'index': 0,
'delta': { 'message': {
'content': chunk, 'role': 'assistant',
'content': response,
}, },
'finish_reason': None, 'finish_reason': 'stop',
} }
], ],
'usage': {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': prompt_tokens + completion_tokens,
},
} }
content = json.dumps(completion_data, separators=(',', ':')) return Response(content=json.dumps(json_data, indent=4), media_type="application/json")
yield f'data: {content}\n\n'
time.sleep(0.03)
end_completion_data = { def streaming():
'id': f'chatcmpl-{completion_id}', try:
'object': 'chat.completion.chunk', for chunk in response:
'created': completion_timestamp, completion_data = {
'model': model, 'id': f'chatcmpl-{completion_id}',
'choices': [ 'object': 'chat.completion.chunk',
{ 'created': completion_timestamp,
'index': 0, 'model': model,
'delta': {}, 'choices': [
'finish_reason': 'stop', {
'index': 0,
'delta': {
'content': chunk,
},
'finish_reason': None,
}
],
}
content = json.dumps(completion_data, separators=(',', ':'))
yield f'data: {content}\n\n'
time.sleep(0.03)
end_completion_data = {
'id': f'chatcmpl-{completion_id}',
'object': 'chat.completion.chunk',
'created': completion_timestamp,
'model': model,
'choices': [
{
'index': 0,
'delta': {},
'finish_reason': 'stop',
}
],
} }
],
}
content = json.dumps(end_completion_data, separators=(',', ':')) content = json.dumps(end_completion_data, separators=(',', ':'))
yield f'data: {content}\n\n' yield f'data: {content}\n\n'
except GeneratorExit: except GeneratorExit:
pass pass
return Response(content=json.dumps(streaming(), indent=4), media_type="application/json") return Response(content=json.dumps(streaming(), indent=4), media_type="application/json")
@app.post("/v1/completions") @self.app.post("/v1/completions")
async def completions(): async def completions():
return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json") return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json")
def run(ip, thread_quantity): def run(self, ip, thread_quantity):
split_ip = ip.split(":") split_ip = ip.split(":")
uvicorn.run(app, host=split_ip[0], port=int(split_ip[1]), use_colors=False, workers=thread_quantity) uvicorn.run(self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False, workers=thread_quantity)