Merge pull request #89 from sudouser777/fix/poe_account_creation

Fix for Poe account creation
This commit is contained in:
t.me/xtekky 2023-04-25 11:24:35 +01:00 committed by GitHub
commit 746dd870b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 612 additions and 386 deletions

16
.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
.idea/
*/__pycache__/
*.log
cookie.json

View File

@ -47,3 +47,17 @@ response = quora.Completion.create(model = 'gpt-4',
print(response.completion.choices[0].text) print(response.completion.choices[0].text)
``` ```
#### Update Use This For Poe
```python
from quora import Poe
# available models: ['Sage', 'GPT-4', 'Claude+', 'Claude-instant', 'ChatGPT', 'Dragonfly', 'NeevaAI']
poe = Poe(model='ChatGPT')
poe.chat('who won the football world cup most?')
# new bot creation
poe.create_bot('new_bot_name', prompt='You are new test bot', base_model='gpt-3.5-turbo')
```

View File

@ -1,28 +1,45 @@
from quora.api import Client as PoeClient import json
from quora.mail import Emailnator from datetime import datetime
from requests import Session from hashlib import md5
from tls_client import Session as TLS from json import dumps
from re import search, findall from pathlib import Path
from json import loads from random import choice, choices, randint
from time import sleep from re import search, findall
from pathlib import Path from string import ascii_letters, digits
from random import choice, choices, randint from typing import Optional
from string import ascii_letters, digits from urllib.parse import unquote
from urllib import parse
from os import urandom import selenium.webdriver.support.expected_conditions as EC
from hashlib import md5 from pypasser import reCaptchaV3
from json import dumps from requests import Session
from pypasser import reCaptchaV3 from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from tls_client import Session as TLS
from quora.api import Client as PoeClient
from quora.mail import Emailnator
# from twocaptcha import TwoCaptcha # from twocaptcha import TwoCaptcha
# solver = TwoCaptcha('72747bf24a9d89b4dcc1b24875efd358') # solver = TwoCaptcha('72747bf24a9d89b4dcc1b24875efd358')
MODELS = {
"Sage": "capybara",
"GPT-4": "beaver",
"Claude+": "a2_2",
"Claude-instant": "a2",
"ChatGPT": "chinchilla",
"Dragonfly": "nutria",
"NeevaAI": "hutia",
}
def extract_formkey(html): def extract_formkey(html):
script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>' script_regex = r"<script>if\(.+\)throw new Error;(.+)</script>"
script_text = search(script_regex, html).group(1) script_text = search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",' key_regex = r'var .="([0-9a-f]+)",'
key_text = search(key_regex, script_text).group(1) key_text = search(key_regex, script_text).group(1)
cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]' cipher_regex = r".\[(\d+)\]=.\[(\d+)\]"
cipher_pairs = findall(cipher_regex, script_text) cipher_pairs = findall(cipher_regex, script_text)
formkey_list = [""] * len(cipher_pairs) formkey_list = [""] * len(cipher_pairs)
@ -33,42 +50,40 @@ def extract_formkey(html):
return formkey return formkey
class PoeResponse: class PoeResponse:
class Completion: class Completion:
class Choices: class Choices:
def __init__(self, choice: dict) -> None: def __init__(self, choice: dict) -> None:
self.text = choice['text'] self.text = choice["text"]
self.content = self.text.encode() self.content = self.text.encode()
self.index = choice['index'] self.index = choice["index"]
self.logprobs = choice['logprobs'] self.logprobs = choice["logprobs"]
self.finish_reason = choice['finish_reason'] self.finish_reason = choice["finish_reason"]
def __repr__(self) -> str: def __repr__(self) -> str:
return f'''<__main__.APIResponse.Completion.Choices(\n text = {self.text.encode()},\n index = {self.index},\n logprobs = {self.logprobs},\n finish_reason = {self.finish_reason})object at 0x1337>''' return f"""<__main__.APIResponse.Completion.Choices(\n text = {self.text.encode()},\n index = {self.index},\n logprobs = {self.logprobs},\n finish_reason = {self.finish_reason})object at 0x1337>"""
def __init__(self, choices: dict) -> None: def __init__(self, choices: dict) -> None:
self.choices = [self.Choices(choice) for choice in choices] self.choices = [self.Choices(choice) for choice in choices]
class Usage: class Usage:
def __init__(self, usage_dict: dict) -> None: def __init__(self, usage_dict: dict) -> None:
self.prompt_tokens = usage_dict['prompt_tokens'] self.prompt_tokens = usage_dict["prompt_tokens"]
self.completion_tokens = usage_dict['completion_tokens'] self.completion_tokens = usage_dict["completion_tokens"]
self.total_tokens = usage_dict['total_tokens'] self.total_tokens = usage_dict["total_tokens"]
def __repr__(self): def __repr__(self):
return f'''<__main__.APIResponse.Usage(\n prompt_tokens = {self.prompt_tokens},\n completion_tokens = {self.completion_tokens},\n total_tokens = {self.total_tokens})object at 0x1337>''' return f"""<__main__.APIResponse.Usage(\n prompt_tokens = {self.prompt_tokens},\n completion_tokens = {self.completion_tokens},\n total_tokens = {self.total_tokens})object at 0x1337>"""
def __init__(self, response_dict: dict) -> None: def __init__(self, response_dict: dict) -> None:
self.response_dict = response_dict
self.response_dict = response_dict self.id = response_dict["id"]
self.id = response_dict['id'] self.object = response_dict["object"]
self.object = response_dict['object'] self.created = response_dict["created"]
self.created = response_dict['created'] self.model = response_dict["model"]
self.model = response_dict['model'] self.completion = self.Completion(response_dict["choices"])
self.completion = self.Completion(response_dict['choices']) self.usage = self.Usage(response_dict["usage"])
self.usage = self.Usage(response_dict['usage'])
def json(self) -> dict: def json(self) -> dict:
return self.response_dict return self.response_dict
@ -76,127 +91,140 @@ class PoeResponse:
class ModelResponse: class ModelResponse:
def __init__(self, json_response: dict) -> None: def __init__(self, json_response: dict) -> None:
self.id = json_response['data']['poeBotCreate']['bot']['id'] self.id = json_response["data"]["poeBotCreate"]["bot"]["id"]
self.name = json_response['data']['poeBotCreate']['bot']['displayName'] self.name = json_response["data"]["poeBotCreate"]["bot"]["displayName"]
self.limit = json_response['data']['poeBotCreate']['bot']['messageLimit']['dailyLimit'] self.limit = json_response["data"]["poeBotCreate"]["bot"]["messageLimit"][
self.deleted = json_response['data']['poeBotCreate']['bot']['deletionState'] "dailyLimit"
]
self.deleted = json_response["data"]["poeBotCreate"]["bot"]["deletionState"]
class Model: class Model:
def create( def create(
token: str, token: str,
model: str = 'gpt-3.5-turbo', # claude-instant model: str = "gpt-3.5-turbo", # claude-instant
system_prompt: str = 'You are ChatGPT a large language model developed by Openai. Answer as consisely as possible', system_prompt: str = "You are ChatGPT a large language model developed by Openai. Answer as consisely as possible",
description: str = 'gpt-3.5 language model from openai, skidded by poe.com', description: str = "gpt-3.5 language model from openai, skidded by poe.com",
handle: str = None) -> ModelResponse: handle: str = None,
) -> ModelResponse:
models = { models = {
'gpt-3.5-turbo' : 'chinchilla', "gpt-3.5-turbo": "chinchilla",
'claude-instant-v1.0': 'a2', "claude-instant-v1.0": "a2",
'gpt-4': 'beaver' "gpt-4": "beaver",
} }
if not handle: if not handle:
handle = f'gptx{randint(1111111, 9999999)}' handle = f"gptx{randint(1111111, 9999999)}"
client = Session() client = Session()
client.cookies['p-b'] = token client.cookies["p-b"] = token
formkey = extract_formkey(client.get('https://poe.com').text) formkey = extract_formkey(client.get("https://poe.com").text)
settings = client.get('https://poe.com/api/settings').json() settings = client.get("https://poe.com/api/settings").json()
client.headers = { client.headers = {
"host" : "poe.com", "host": "poe.com",
"origin" : "https://poe.com", "origin": "https://poe.com",
"referer" : "https://poe.com/", "referer": "https://poe.com/",
"content-type" : "application/json", "poe-formkey": formkey,
"poe-formkey" : formkey, "poe-tchannel": settings["tchannelData"]["channel"],
"poe-tchannel" : settings['tchannelData']['channel'], "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"user-agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", "connection": "keep-alive",
"connection" : "keep-alive", "sec-ch-ua": '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"',
"sec-ch-ua" : "\"Chromium\";v=\"112\", \"Google Chrome\";v=\"112\", \"Not:A-Brand\";v=\"99\"", "sec-ch-ua-mobile": "?0",
"sec-ch-ua-mobile" : "?0", "sec-ch-ua-platform": '"macOS"',
"sec-ch-ua-platform": "\"macOS\"", "content-type": "application/json",
"content-type" : "application/json", "sec-fetch-site": "same-origin",
"sec-fetch-site" : "same-origin", "sec-fetch-mode": "cors",
"sec-fetch-mode" : "cors", "sec-fetch-dest": "empty",
"sec-fetch-dest" : "empty", "accept": "*/*",
"accept" : "*/*", "accept-encoding": "gzip, deflate, br",
"accept-encoding" : "gzip, deflate, br", "accept-language": "en-GB,en-US;q=0.9,en;q=0.8",
"accept-language" : "en-GB,en-US;q=0.9,en;q=0.8",
} }
payload = dumps(separators=(',', ':'), obj = { payload = dumps(
'queryName': 'CreateBotMain_poeBotCreate_Mutation', separators=(",", ":"),
'variables': { obj={
'model' : models[model], "queryName": "CreateBotMain_poeBotCreate_Mutation",
'handle' : handle, "variables": {
'prompt' : system_prompt, "model": models[model],
'isPromptPublic' : True, "handle": handle,
'introduction' : '', "prompt": system_prompt,
'description' : description, "isPromptPublic": True,
'profilePictureUrl' : 'https://qph.fs.quoracdn.net/main-qimg-24e0b480dcd946e1cc6728802c5128b6', "introduction": "",
'apiUrl' : None, "description": description,
'apiKey' : ''.join(choices(ascii_letters + digits, k = 32)), "profilePictureUrl": "https://qph.fs.quoracdn.net/main-qimg-24e0b480dcd946e1cc6728802c5128b6",
'isApiBot' : False, "apiUrl": None,
'hasLinkification' : False, "apiKey": "".join(choices(ascii_letters + digits, k=32)),
'hasMarkdownRendering' : False, "isApiBot": False,
'hasSuggestedReplies' : False, "hasLinkification": False,
'isPrivateBot' : False "hasMarkdownRendering": False,
"hasSuggestedReplies": False,
"isPrivateBot": False,
},
"query": "mutation CreateBotMain_poeBotCreate_Mutation(\n $model: String!\n $handle: String!\n $prompt: String!\n $isPromptPublic: Boolean!\n $introduction: String!\n $description: String!\n $profilePictureUrl: String\n $apiUrl: String\n $apiKey: String\n $isApiBot: Boolean\n $hasLinkification: Boolean\n $hasMarkdownRendering: Boolean\n $hasSuggestedReplies: Boolean\n $isPrivateBot: Boolean\n) {\n poeBotCreate(model: $model, handle: $handle, promptPlaintext: $prompt, isPromptPublic: $isPromptPublic, introduction: $introduction, description: $description, profilePicture: $profilePictureUrl, apiUrl: $apiUrl, apiKey: $apiKey, isApiBot: $isApiBot, hasLinkification: $hasLinkification, hasMarkdownRendering: $hasMarkdownRendering, hasSuggestedReplies: $hasSuggestedReplies, isPrivateBot: $isPrivateBot) {\n status\n bot {\n id\n ...BotHeader_bot\n }\n }\n}\n\nfragment BotHeader_bot on Bot {\n displayName\n messageLimit {\n dailyLimit\n }\n ...BotImage_bot\n ...BotLink_bot\n ...IdAnnotation_node\n ...botHelpers_useViewerCanAccessPrivateBot\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotImage_bot on Bot {\n displayName\n ...botHelpers_useDeletion_bot\n ...BotImage_useProfileImage_bot\n}\n\nfragment BotImage_useProfileImage_bot on Bot {\n image {\n __typename\n ... on LocalBotImage {\n localName\n }\n ... on UrlBotImage {\n url\n }\n }\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotLink_bot on Bot {\n displayName\n}\n\nfragment IdAnnotation_node on Node {\n __isNode: __typename\n id\n}\n\nfragment botHelpers_useDeletion_bot on Bot {\n deletionState\n}\n\nfragment botHelpers_useViewerCanAccessPrivateBot on Bot {\n isPrivateBot\n viewerIsCreator\n}\n",
}, },
'query': 'mutation CreateBotMain_poeBotCreate_Mutation(\n $model: String!\n $handle: String!\n $prompt: String!\n $isPromptPublic: Boolean!\n $introduction: String!\n $description: String!\n $profilePictureUrl: String\n $apiUrl: String\n $apiKey: String\n $isApiBot: Boolean\n $hasLinkification: Boolean\n $hasMarkdownRendering: Boolean\n $hasSuggestedReplies: Boolean\n $isPrivateBot: Boolean\n) {\n poeBotCreate(model: $model, handle: $handle, promptPlaintext: $prompt, isPromptPublic: $isPromptPublic, introduction: $introduction, description: $description, profilePicture: $profilePictureUrl, apiUrl: $apiUrl, apiKey: $apiKey, isApiBot: $isApiBot, hasLinkification: $hasLinkification, hasMarkdownRendering: $hasMarkdownRendering, hasSuggestedReplies: $hasSuggestedReplies, isPrivateBot: $isPrivateBot) {\n status\n bot {\n id\n ...BotHeader_bot\n }\n }\n}\n\nfragment BotHeader_bot on Bot {\n displayName\n messageLimit {\n dailyLimit\n }\n ...BotImage_bot\n ...BotLink_bot\n ...IdAnnotation_node\n ...botHelpers_useViewerCanAccessPrivateBot\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotImage_bot on Bot {\n displayName\n ...botHelpers_useDeletion_bot\n ...BotImage_useProfileImage_bot\n}\n\nfragment BotImage_useProfileImage_bot on Bot {\n image {\n __typename\n ... on LocalBotImage {\n localName\n }\n ... on UrlBotImage {\n url\n }\n }\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotLink_bot on Bot {\n displayName\n}\n\nfragment IdAnnotation_node on Node {\n __isNode: __typename\n id\n}\n\nfragment botHelpers_useDeletion_bot on Bot {\n deletionState\n}\n\nfragment botHelpers_useViewerCanAccessPrivateBot on Bot {\n isPrivateBot\n viewerIsCreator\n}\n', )
})
base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k' base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
response = client.post("https://poe.com/api/gql_POST", data = payload)
if not 'success' in response.text: response = client.post("https://poe.com/api/gql_POST", data=payload)
raise Exception('''
if "success" not in response.text:
raise Exception(
"""
Bot creation Failed Bot creation Failed
!! Important !! !! Important !!
Bot creation was not enabled on this account Bot creation was not enabled on this account
please use: quora.Account.create with enable_bot_creation set to True please use: quora.Account.create with enable_bot_creation set to True
''') """
)
return ModelResponse(response.json()) return ModelResponse(response.json())
class Account: class Account:
def create(proxy: None or str = None, logging: bool = False, enable_bot_creation: bool = False): def create(
client = TLS(client_identifier='chrome110') proxy: Optional[str] = None,
client.proxies = { logging: bool = False,
'http': f'http://{proxy}', enable_bot_creation: bool = False,
'https': f'http://{proxy}'} if proxy else None ):
client = TLS(client_identifier="chrome110")
client.proxies = (
{"http": f"http://{proxy}", "https": f"http://{proxy}"} if proxy else None
)
mail_client = Emailnator() mail_client = Emailnator()
mail_address = mail_client.get_mail() mail_address = mail_client.get_mail()
if logging: print('email', mail_address) if logging:
print("email", mail_address)
client.headers = { client.headers = {
'authority' : 'poe.com', "authority": "poe.com",
'accept' : '*/*', "accept": "*/*",
'accept-language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3', "accept-language": "en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3",
'content-type' : 'application/json', "content-type": "application/json",
'origin' : 'https://poe.com', "origin": "https://poe.com",
'poe-formkey' : 'null', "poe-tag-id": "null",
'poe-tag-id' : 'null', "referer": "https://poe.com/login",
'poe-tchannel' : 'null', "sec-ch-ua": '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"',
'referer' : 'https://poe.com/login', "sec-ch-ua-mobile": "?0",
'sec-ch-ua' : '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"', "sec-ch-ua-platform": '"macOS"',
'sec-ch-ua-mobile' : '?0', "sec-fetch-dest": "empty",
'sec-ch-ua-platform': '"macOS"', "sec-fetch-mode": "cors",
'sec-fetch-dest': 'empty', "sec-fetch-site": "same-origin",
'sec-fetch-mode': 'cors', "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
'sec-fetch-site': 'same-origin', "poe-formkey": extract_formkey(client.get("https://poe.com/login").text),
'user-agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36' "poe-tchannel": client.get("https://poe.com/api/settings").json()[
"tchannelData"
]["channel"],
} }
client.headers["poe-formkey"] = extract_formkey(client.get('https://poe.com/login').text) token = reCaptchaV3(
client.headers["poe-tchannel"] = client.get('https://poe.com/api/settings').json()['tchannelData']['channel'] "https://www.recaptcha.net/recaptcha/enterprise/anchor?ar=1&k=6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG&co=aHR0cHM6Ly9wb2UuY29tOjQ0Mw..&hl=en&v=4PnKmGB9wRHh1i04o7YUICeI&size=invisible&cb=bi6ivxoskyal"
)
token = reCaptchaV3('https://www.recaptcha.net/recaptcha/enterprise/anchor?ar=1&k=6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG&co=aHR0cHM6Ly9wb2UuY29tOjQ0Mw..&hl=en&v=4PnKmGB9wRHh1i04o7YUICeI&size=invisible&cb=bi6ivxoskyal')
# token = solver.recaptcha(sitekey='6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG', # token = solver.recaptcha(sitekey='6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG',
# url = 'https://poe.com/login?redirect_url=%2F', # url = 'https://poe.com/login?redirect_url=%2F',
# version = 'v3', # version = 'v3',
@ -204,132 +232,252 @@ class Account:
# invisible = 1, # invisible = 1,
# action = 'login',)['code'] # action = 'login',)['code']
payload = dumps(separators = (',', ':'), obj = { payload = dumps(
'queryName': 'MainSignupLoginSection_sendVerificationCodeMutation_Mutation', separators=(",", ":"),
'variables': { obj={
'emailAddress' : mail_address, "queryName": "MainSignupLoginSection_sendVerificationCodeMutation_Mutation",
'phoneNumber' : None, "variables": {
'recaptchaToken': token "emailAddress": mail_address,
"phoneNumber": None,
"recaptchaToken": token,
},
"query": "mutation MainSignupLoginSection_sendVerificationCodeMutation_Mutation(\n $emailAddress: String\n $phoneNumber: String\n $recaptchaToken: String\n) {\n sendVerificationCode(verificationReason: login, emailAddress: $emailAddress, phoneNumber: $phoneNumber, recaptchaToken: $recaptchaToken) {\n status\n errorMessage\n }\n}\n",
}, },
'query': 'mutation MainSignupLoginSection_sendVerificationCodeMutation_Mutation(\n $emailAddress: String\n $phoneNumber: String\n $recaptchaToken: String\n) {\n sendVerificationCode(verificationReason: login, emailAddress: $emailAddress, phoneNumber: $phoneNumber, recaptchaToken: $recaptchaToken) {\n status\n errorMessage\n }\n}\n', )
})
base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
print(dumps(client.headers, indent=4)) print(dumps(client.headers, indent=4))
response = client.post('https://poe.com/api/gql_POST', data=payload) response = client.post("https://poe.com/api/gql_POST", data=payload)
if 'automated_request_detected' in response.text: if "automated_request_detected" in response.text:
print('please try using a proxy / wait for fix') print("please try using a proxy / wait for fix")
if 'Bad Request' in response.text: if "Bad Request" in response.text:
if logging: print('bad request, retrying...' , response.json()) if logging:
print("bad request, retrying...", response.json())
quit() quit()
if logging: print('send_code' ,response.json()) if logging:
print("send_code", response.json())
mail_content = mail_client.get_message() mail_content = mail_client.get_message()
mail_token = findall(r';">(\d{6,7})</div>', mail_content)[0] mail_token = findall(r';">(\d{6,7})</div>', mail_content)[0]
if logging: print('code', mail_token) if logging:
print("code", mail_token)
payload = dumps(separators = (',', ':'), obj={ payload = dumps(
"queryName": "SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation", separators=(",", ":"),
"variables": { obj={
"verificationCode": str(mail_token), "queryName": "SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation",
"emailAddress": mail_address, "variables": {
"phoneNumber": None "verificationCode": str(mail_token),
"emailAddress": mail_address,
"phoneNumber": None,
},
"query": "mutation SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation(\n $verificationCode: String!\n $emailAddress: String\n $phoneNumber: String\n) {\n signupWithVerificationCode(verificationCode: $verificationCode, emailAddress: $emailAddress, phoneNumber: $phoneNumber) {\n status\n errorMessage\n }\n}\n",
}, },
"query": "mutation SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation(\n $verificationCode: String!\n $emailAddress: String\n $phoneNumber: String\n) {\n signupWithVerificationCode(verificationCode: $verificationCode, emailAddress: $emailAddress, phoneNumber: $phoneNumber) {\n status\n errorMessage\n }\n}\n" )
})
base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k' base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
response = client.post('https://poe.com/api/gql_POST', data = payload) response = client.post("https://poe.com/api/gql_POST", data=payload)
if logging: print('verify_code', response.json()) if logging:
print("verify_code", response.json())
def get():
cookies = open(Path(__file__).resolve().parent / 'cookies.txt', 'r').read().splitlines() def get(self):
cookies = (
open(Path(__file__).resolve().parent / "cookies.txt", "r")
.read()
.splitlines()
)
return choice(cookies) return choice(cookies)
class StreamingCompletion: class StreamingCompletion:
def create( def create(
model : str = 'gpt-4', model: str = "gpt-4",
custom_model : bool = None, custom_model: bool = None,
prompt: str = 'hello world', prompt: str = "hello world",
token : str = ''): token: str = "",
):
_model = MODELS[model] if not custom_model else custom_model
models = {
'sage' : 'capybara',
'gpt-4' : 'beaver',
'claude-v1.2' : 'a2_2',
'claude-instant-v1.0' : 'a2',
'gpt-3.5-turbo' : 'chinchilla'
}
_model = models[model] if not custom_model else custom_model
client = PoeClient(token) client = PoeClient(token)
for chunk in client.send_message(_model, prompt): for chunk in client.send_message(_model, prompt):
yield PoeResponse(
yield PoeResponse({ {
'id' : chunk["messageId"], "id": chunk["messageId"],
'object' : 'text_completion', "object": "text_completion",
'created': chunk['creationTime'], "created": chunk["creationTime"],
'model' : _model, "model": _model,
'choices': [{ "choices": [
'text' : chunk["text_new"], {
'index' : 0, "text": chunk["text_new"],
'logprobs' : None, "index": 0,
'finish_reason' : 'stop' "logprobs": None,
}], "finish_reason": "stop",
'usage': { }
'prompt_tokens' : len(prompt), ],
'completion_tokens' : len(chunk["text_new"]), "usage": {
'total_tokens' : len(prompt) + len(chunk["text_new"]) "prompt_tokens": len(prompt),
"completion_tokens": len(chunk["text_new"]),
"total_tokens": len(prompt) + len(chunk["text_new"]),
},
} }
}) )
class Completion: class Completion:
def create( def create(
model : str = 'gpt-4', model: str = "gpt-4",
custom_model : str = None, custom_model: str = None,
prompt: str = 'hello world', prompt: str = "hello world",
token : str = ''): token: str = "",
):
models = { models = {
'sage' : 'capybara', "sage": "capybara",
'gpt-4' : 'beaver', "gpt-4": "beaver",
'claude-v1.2' : 'a2_2', "claude-v1.2": "a2_2",
'claude-instant-v1.0' : 'a2', "claude-instant-v1.0": "a2",
'gpt-3.5-turbo' : 'chinchilla' "gpt-3.5-turbo": "chinchilla",
} }
_model = models[model] if not custom_model else custom_model _model = models[model] if not custom_model else custom_model
client = PoeClient(token) client = PoeClient(token)
for chunk in client.send_message(_model, prompt): for chunk in client.send_message(_model, prompt):
pass pass
return PoeResponse({ return PoeResponse(
'id' : chunk["messageId"], {
'object' : 'text_completion', "id": chunk["messageId"],
'created': chunk['creationTime'], "object": "text_completion",
'model' : _model, "created": chunk["creationTime"],
'choices': [{ "model": _model,
'text' : chunk["text"], "choices": [
'index' : 0, {
'logprobs' : None, "text": chunk["text"],
'finish_reason' : 'stop' "index": 0,
}], "logprobs": None,
'usage': { "finish_reason": "stop",
'prompt_tokens' : len(prompt), }
'completion_tokens' : len(chunk["text"]), ],
'total_tokens' : len(prompt) + len(chunk["text"]) "usage": {
} "prompt_tokens": len(prompt),
}) "completion_tokens": len(chunk["text"]),
"total_tokens": len(prompt) + len(chunk["text"]),
},
}
)
class Poe:
def __init__(self, model: str = "ChatGPT"):
# validating the model
if model and model not in MODELS:
raise RuntimeError(
"Sorry, the model you provided does not exist. Please check and try again."
)
self.model = MODELS[model]
self.cookie = self.__load_cookie()
self.client = PoeClient(self.cookie)
def __load_cookie(self) -> str:
if (cookie_file := Path("cookie.json")).exists():
with cookie_file.open() as fp:
cookie = json.load(fp)
if datetime.fromtimestamp(cookie["expiry"]) < datetime.now():
cookie = self.__register_and_get_cookie()
else:
print("Loading the cookie from file")
else:
cookie = self.__register_and_get_cookie()
return unquote(cookie["value"])
@classmethod
def __register_and_get_cookie(cls) -> dict:
mail_client = Emailnator()
mail_address = mail_client.get_mail()
print(mail_address)
options = webdriver.FirefoxOptions()
# options.add_argument("-headless")
driver = webdriver.Firefox(options=options)
driver.get("https://www.poe.com")
# clicking use email button
driver.find_element(By.XPATH, '//button[contains(text(), "Use email")]').click()
email = WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="email"]'))
)
email.send_keys(mail_address)
driver.find_element(By.XPATH, '//button[text()="Go"]').click()
code = findall(r';">(\d{6,7})</div>', mail_client.get_message())[0]
print(code)
verification_code = WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Code"]'))
)
verification_code.send_keys(code)
verify_button = EC.presence_of_element_located(
(By.XPATH, '//button[text()="Verify"]')
)
login_button = EC.presence_of_element_located(
(By.XPATH, '//button[text()="Log In"]')
)
WebDriverWait(driver, 30).until(EC.any_of(verify_button, login_button)).click()
cookie = driver.get_cookie("p-b")
with open("cookie.json", "w") as fw:
json.dump(cookie, fw)
driver.close()
return cookie
def chat(self, message: str, model: Optional[str] = None) -> str:
if model and model not in MODELS:
raise RuntimeError(
"Sorry, the model you provided does not exist. Please check and try again."
)
model = MODELS[model] if model else self.model
response = None
for chunk in self.client.send_message(model, message):
response = chunk["text"]
return response
def create_bot(
self,
name: str,
/,
prompt: str = "",
base_model: str = "ChatGPT",
description: str = "",
) -> None:
if base_model not in MODELS:
raise RuntimeError(
"Sorry, the base_model you provided does not exist. Please check and try again."
)
response = self.client.create_bot(
handle=name,
prompt=prompt,
base_model=MODELS[base_model],
description=description,
)
print(f'Successfully created bot with name: {response["bot"]["displayName"]}')
def list_bots(self) -> list:
return list(self.client.bot_names.values())

View File

@ -55,10 +55,7 @@ def load_queries():
def generate_payload(query_name, variables): def generate_payload(query_name, variables):
return { return {"query": queries[query_name], "variables": variables}
"query": queries[query_name],
"variables": variables
}
def request_with_retries(method, *args, **kwargs): def request_with_retries(method, *args, **kwargs):
@ -69,7 +66,8 @@ def request_with_retries(method, *args, **kwargs):
if r.status_code == 200: if r.status_code == 200:
return r return r
logger.warn( logger.warn(
f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})...") f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})..."
)
raise RuntimeError(f"Failed to download {url} too many times.") raise RuntimeError(f"Failed to download {url} too many times.")
@ -84,15 +82,13 @@ class Client:
self.proxy = proxy self.proxy = proxy
self.session = requests.Session() self.session = requests.Session()
self.adapter = requests.adapters.HTTPAdapter( self.adapter = requests.adapters.HTTPAdapter(
pool_connections=100, pool_maxsize=100) pool_connections=100, pool_maxsize=100
)
self.session.mount("http://", self.adapter) self.session.mount("http://", self.adapter)
self.session.mount("https://", self.adapter) self.session.mount("https://", self.adapter)
if proxy: if proxy:
self.session.proxies = { self.session.proxies = {"http": self.proxy, "https": self.proxy}
"http": self.proxy,
"https": self.proxy
}
logger.info(f"Proxy enabled: {self.proxy}") logger.info(f"Proxy enabled: {self.proxy}")
self.active_messages = {} self.active_messages = {}
@ -124,11 +120,11 @@ class Client:
self.subscribe() self.subscribe()
def extract_formkey(self, html): def extract_formkey(self, html):
script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>' script_regex = r"<script>if\(.+\)throw new Error;(.+)</script>"
script_text = re.search(script_regex, html).group(1) script_text = re.search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",' key_regex = r'var .="([0-9a-f]+)",'
key_text = re.search(key_regex, script_text).group(1) key_text = re.search(key_regex, script_text).group(1)
cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]' cipher_regex = r".\[(\d+)\]=.\[(\d+)\]"
cipher_pairs = re.findall(cipher_regex, script_text) cipher_pairs = re.findall(cipher_regex, script_text)
formkey_list = [""] * len(cipher_pairs) formkey_list = [""] * len(cipher_pairs)
@ -143,7 +139,9 @@ class Client:
logger.info("Downloading next_data...") logger.info("Downloading next_data...")
r = request_with_retries(self.session.get, self.home_url) r = request_with_retries(self.session.get, self.home_url)
json_regex = r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>' json_regex = (
r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>'
)
json_text = re.search(json_regex, r.text).group(1) json_text = re.search(json_regex, r.text).group(1)
next_data = json.loads(json_text) next_data = json.loads(json_text)
@ -181,8 +179,7 @@ class Client:
bots[chat_data["defaultBotObject"]["nickname"]] = chat_data bots[chat_data["defaultBotObject"]["nickname"]] = chat_data
for bot in bot_list: for bot in bot_list:
thread = threading.Thread( thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True)
target=get_bot_thread, args=(bot,), daemon=True)
threads.append(thread) threads.append(thread)
for thread in threads: for thread in threads:
@ -216,50 +213,59 @@ class Client:
if channel is None: if channel is None:
channel = self.channel channel = self.channel
query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}' query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}'
return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query return (
f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'
+ query
)
def send_query(self, query_name, variables): def send_query(self, query_name, variables):
for i in range(20): for i in range(20):
json_data = generate_payload(query_name, variables) json_data = generate_payload(query_name, variables)
payload = json.dumps(json_data, separators=(",", ":")) payload = json.dumps(json_data, separators=(",", ":"))
base_string = payload + \ base_string = (
self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k" payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
)
headers = { headers = {
"content-type": "application/json", "content-type": "application/json",
"poe-tag-id": hashlib.md5(base_string.encode()).hexdigest() "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(),
} }
headers = {**self.gql_headers, **headers} headers = {**self.gql_headers, **headers}
r = request_with_retries( r = request_with_retries(
self.session.post, self.gql_url, data=payload, headers=headers) self.session.post, self.gql_url, data=payload, headers=headers
)
data = r.json() data = r.json()
if data["data"] == None: if data["data"] == None:
logger.warn( logger.warn(
f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)') f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)'
)
time.sleep(2) time.sleep(2)
continue continue
return r.json() return r.json()
raise RuntimeError(f'{query_name} failed too many times.') raise RuntimeError(f"{query_name} failed too many times.")
def subscribe(self): def subscribe(self):
logger.info("Subscribing to mutations") logger.info("Subscribing to mutations")
result = self.send_query("SubscriptionsMutation", { result = self.send_query(
"subscriptions": [ "SubscriptionsMutation",
{ {
"subscriptionName": "messageAdded", "subscriptions": [
"query": queries["MessageAddedSubscription"] {
}, "subscriptionName": "messageAdded",
{ "query": queries["MessageAddedSubscription"],
"subscriptionName": "viewerStateUpdated", },
"query": queries["ViewerStateUpdatedSubscription"] {
} "subscriptionName": "viewerStateUpdated",
] "query": queries["ViewerStateUpdatedSubscription"],
}) },
]
},
)
def ws_run_thread(self): def ws_run_thread(self):
kwargs = {} kwargs = {}
@ -268,7 +274,7 @@ class Client:
kwargs = { kwargs = {
"proxy_type": proxy_parsed.scheme, "proxy_type": proxy_parsed.scheme,
"http_proxy_host": proxy_parsed.hostname, "http_proxy_host": proxy_parsed.hostname,
"http_proxy_port": proxy_parsed.port "http_proxy_port": proxy_parsed.port,
} }
self.ws.run_forever(**kwargs) self.ws.run_forever(**kwargs)
@ -281,7 +287,7 @@ class Client:
on_message=self.on_message, on_message=self.on_message,
on_open=self.on_ws_connect, on_open=self.on_ws_connect,
on_error=self.on_ws_error, on_error=self.on_ws_error,
on_close=self.on_ws_close on_close=self.on_ws_close,
) )
t = threading.Thread(target=self.ws_run_thread, daemon=True) t = threading.Thread(target=self.ws_run_thread, daemon=True)
t.start() t.start()
@ -299,7 +305,8 @@ class Client:
def on_ws_close(self, ws, close_status_code, close_message): def on_ws_close(self, ws, close_status_code, close_message):
self.ws_connected = False self.ws_connected = False
logger.warn( logger.warn(
f"Websocket closed with status {close_status_code}: {close_message}") f"Websocket closed with status {close_status_code}: {close_message}"
)
def on_ws_error(self, ws, error): def on_ws_error(self, ws, error):
self.disconnect_ws() self.disconnect_ws()
@ -326,7 +333,11 @@ class Client:
return return
# indicate that the response id is tied to the human message id # indicate that the response id is tied to the human message id
elif key != "pending" and value == None and message["state"] != "complete": elif (
key != "pending"
and value == None
and message["state"] != "complete"
):
self.active_messages[key] = message["messageId"] self.active_messages[key] = message["messageId"]
self.message_queues[key].put(message) self.message_queues[key].put(message)
return return
@ -352,13 +363,16 @@ class Client:
self.setup_connection() self.setup_connection()
self.connect_ws() self.connect_ws()
message_data = self.send_query("SendMessageMutation", { message_data = self.send_query(
"bot": chatbot, "SendMessageMutation",
"query": message, {
"chatId": self.bots[chatbot]["chatId"], "bot": chatbot,
"source": None, "query": message,
"withChatBreak": with_chat_break "chatId": self.bots[chatbot]["chatId"],
}) "source": None,
"withChatBreak": with_chat_break,
},
)
del self.active_messages["pending"] del self.active_messages["pending"]
if not message_data["data"]["messageEdgeCreate"]["message"]: if not message_data["data"]["messageEdgeCreate"]["message"]:
@ -368,7 +382,8 @@ class Client:
human_message_id = human_message["node"]["messageId"] human_message_id = human_message["node"]["messageId"]
except TypeError: except TypeError:
raise RuntimeError( raise RuntimeError(
f"An unknown error occurred. Raw response data: {message_data}") f"An unknown error occurred. Raw response data: {message_data}"
)
# indicate that the current message is waiting for a response # indicate that the current message is waiting for a response
self.active_messages[human_message_id] = None self.active_messages[human_message_id] = None
@ -378,8 +393,7 @@ class Client:
message_id = None message_id = None
while True: while True:
try: try:
message = self.message_queues[human_message_id].get( message = self.message_queues[human_message_id].get(timeout=timeout)
timeout=timeout)
except queue.Empty: except queue.Empty:
del self.active_messages[human_message_id] del self.active_messages[human_message_id]
del self.message_queues[human_message_id] del self.message_queues[human_message_id]
@ -393,7 +407,7 @@ class Client:
continue continue
# update info about response # update info about response
message["text_new"] = message["text"][len(last_text):] message["text_new"] = message["text"][len(last_text) :]
last_text = message["text"] last_text = message["text"]
message_id = message["messageId"] message_id = message["messageId"]
@ -404,9 +418,9 @@ class Client:
def send_chat_break(self, chatbot): def send_chat_break(self, chatbot):
logger.info(f"Sending chat break to {chatbot}") logger.info(f"Sending chat break to {chatbot}")
result = self.send_query("AddMessageBreakMutation", { result = self.send_query(
"chatId": self.bots[chatbot]["chatId"] "AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]}
}) )
return result["data"]["messageBreakCreate"]["message"] return result["data"]["messageBreakCreate"]["message"]
def get_message_history(self, chatbot, count=25, cursor=None): def get_message_history(self, chatbot, count=25, cursor=None):
@ -423,23 +437,24 @@ class Client:
cursor = str(cursor) cursor = str(cursor)
if count > 50: if count > 50:
messages = self.get_message_history( messages = (
chatbot, count=50, cursor=cursor) + messages self.get_message_history(chatbot, count=50, cursor=cursor) + messages
)
while count > 0: while count > 0:
count -= 50 count -= 50
new_cursor = messages[0]["cursor"] new_cursor = messages[0]["cursor"]
new_messages = self.get_message_history( new_messages = self.get_message_history(
chatbot, min(50, count), cursor=new_cursor) chatbot, min(50, count), cursor=new_cursor
)
messages = new_messages + messages messages = new_messages + messages
return messages return messages
elif count <= 0: elif count <= 0:
return messages return messages
result = self.send_query("ChatListPaginationQuery", { result = self.send_query(
"count": count, "ChatListPaginationQuery",
"cursor": cursor, {"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]},
"id": self.bots[chatbot]["id"] )
})
query_messages = result["data"]["node"]["messagesConnection"]["edges"] query_messages = result["data"]["node"]["messagesConnection"]["edges"]
messages = query_messages + messages messages = query_messages + messages
return messages return messages
@ -449,9 +464,7 @@ class Client:
if not type(message_ids) is list: if not type(message_ids) is list:
message_ids = [int(message_ids)] message_ids = [int(message_ids)]
result = self.send_query("DeleteMessageMutation", { result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids})
"messageIds": message_ids
})
def purge_conversation(self, chatbot, count=-1): def purge_conversation(self, chatbot, count=-1):
logger.info(f"Purging messages from {chatbot}") logger.info(f"Purging messages from {chatbot}")
@ -471,60 +484,93 @@ class Client:
last_messages = self.get_message_history(chatbot, count=50)[::-1] last_messages = self.get_message_history(chatbot, count=50)[::-1]
logger.info(f"No more messages left to delete.") logger.info(f"No more messages left to delete.")
def create_bot(self, handle, prompt="", base_model="chinchilla", description="", def create_bot(
intro_message="", api_key=None, api_bot=False, api_url=None, self,
prompt_public=True, pfp_url=None, linkification=False, handle,
markdown_rendering=True, suggested_replies=False, private=False): prompt="",
result = self.send_query("PoeBotCreateMutation", { base_model="chinchilla",
"model": base_model, description="",
"handle": handle, intro_message="",
"prompt": prompt, api_key=None,
"isPromptPublic": prompt_public, api_bot=False,
"introduction": intro_message, api_url=None,
"description": description, prompt_public=True,
"profilePictureUrl": pfp_url, pfp_url=None,
"apiUrl": api_url, linkification=False,
"apiKey": api_key, markdown_rendering=True,
"isApiBot": api_bot, suggested_replies=False,
"hasLinkification": linkification, private=False,
"hasMarkdownRendering": markdown_rendering, ):
"hasSuggestedReplies": suggested_replies, result = self.send_query(
"isPrivateBot": private "PoeBotCreateMutation",
}) {
"model": base_model,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"isApiBot": api_bot,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private,
},
)
data = result["data"]["poeBotCreate"] data = result["data"]["poeBotCreate"]
if data["status"] != "success": if data["status"] != "success":
raise RuntimeError( raise RuntimeError(
f"Poe returned an error while trying to create a bot: {data['status']}") f"Poe returned an error while trying to create a bot: {data['status']}"
)
self.get_bots() self.get_bots()
return data return data
def edit_bot(self, bot_id, handle, prompt="", base_model="chinchilla", description="", def edit_bot(
intro_message="", api_key=None, api_url=None, private=False, self,
prompt_public=True, pfp_url=None, linkification=False, bot_id,
markdown_rendering=True, suggested_replies=False): handle,
prompt="",
result = self.send_query("PoeBotEditMutation", { base_model="chinchilla",
"baseBot": base_model, description="",
"botId": bot_id, intro_message="",
"handle": handle, api_key=None,
"prompt": prompt, api_url=None,
"isPromptPublic": prompt_public, private=False,
"introduction": intro_message, prompt_public=True,
"description": description, pfp_url=None,
"profilePictureUrl": pfp_url, linkification=False,
"apiUrl": api_url, markdown_rendering=True,
"apiKey": api_key, suggested_replies=False,
"hasLinkification": linkification, ):
"hasMarkdownRendering": markdown_rendering, result = self.send_query(
"hasSuggestedReplies": suggested_replies, "PoeBotEditMutation",
"isPrivateBot": private {
}) "baseBot": base_model,
"botId": bot_id,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private,
},
)
data = result["data"]["poeBotEdit"] data = result["data"]["poeBotEdit"]
if data["status"] != "success": if data["status"] != "success":
raise RuntimeError( raise RuntimeError(
f"Poe returned an error while trying to edit a bot: {data['status']}") f"Poe returned an error while trying to edit a bot: {data['status']}"
)
self.get_bots() self.get_bots()
return data return data

View File

@ -1,66 +1,66 @@
from json import loads
from re import findall
from time import sleep
from fake_useragent import UserAgent
from requests import Session from requests import Session
from time import sleep
from re import search, findall
from json import loads
class Emailnator: class Emailnator:
def __init__(self) -> None: def __init__(self) -> None:
self.client = Session() self.client = Session()
self.client.get('https://www.emailnator.com/', timeout=6) self.client.get("https://www.emailnator.com/", timeout=6)
self.cookies = self.client.cookies.get_dict() self.cookies = self.client.cookies.get_dict()
self.client.headers = { self.client.headers = {
'authority' : 'www.emailnator.com', "authority": "www.emailnator.com",
'origin' : 'https://www.emailnator.com', "origin": "https://www.emailnator.com",
'referer' : 'https://www.emailnator.com/', "referer": "https://www.emailnator.com/",
'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.0.0 Safari/537.36 Edg/101.0.1722.39', "user-agent": UserAgent().random,
'x-xsrf-token' : self.client.cookies.get("XSRF-TOKEN")[:-3]+"=", "x-xsrf-token": self.client.cookies.get("XSRF-TOKEN")[:-3] + "=",
} }
self.email = None self.email = None
def get_mail(self): def get_mail(self):
response = self.client.post('https://www.emailnator.com/generate-email',json = { response = self.client.post(
'email': [ "https://www.emailnator.com/generate-email",
'domain', json={
'plusGmail', "email": [
'dotGmail', "domain",
] "plusGmail",
}) "dotGmail",
]
},
)
self.email = loads(response.text)["email"][0] self.email = loads(response.text)["email"][0]
return self.email return self.email
def get_message(self): def get_message(self):
print("waiting for code...") print("waiting for code...")
while True: while True:
sleep(2) sleep(2)
mail_token = self.client.post('https://www.emailnator.com/message-list', mail_token = self.client.post(
json = {'email': self.email}) "https://www.emailnator.com/message-list", json={"email": self.email}
)
mail_token = loads(mail_token.text)["messageData"] mail_token = loads(mail_token.text)["messageData"]
if len(mail_token) == 2: if len(mail_token) == 2:
print(mail_token[1]["messageID"]) print(mail_token[1]["messageID"])
break break
mail_context = self.client.post('https://www.emailnator.com/message-list', json = { mail_context = self.client.post(
'email' : self.email, "https://www.emailnator.com/message-list",
'messageID': mail_token[1]["messageID"], json={
}) "email": self.email,
"messageID": mail_token[1]["messageID"],
},
)
return mail_context.text return mail_context.text
# mail_client = Emailnator() def get_verification_code(self):
# mail_adress = mail_client.get_mail() return findall(r';">(\d{6,7})</div>', self.get_message())[0]
# print(mail_adress)
# mail_content = mail_client.get_message()
# print(mail_content)
# code = findall(r';">(\d{6,7})</div>', mail_content)[0]
# print(code)

View File

@ -5,4 +5,6 @@ pypasser
names names
colorama colorama
curl_cffi curl_cffi
streamlit==1.21.0 streamlit==1.21.0
selenium
fake-useragent