From fb1b10e7aa33ae7fce154e067777c1a2f7d5231d Mon Sep 17 00:00:00 2001 From: AlexBartles <62007969+AlexBartles@users.noreply.github.com> Date: Thu, 8 Jun 2023 07:31:40 +0100 Subject: [PATCH] Update main.py --- main.py | 642 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 635 insertions(+), 7 deletions(-) diff --git a/main.py b/main.py index c18561e..09fd83f 100644 --- a/main.py +++ b/main.py @@ -1,13 +1,641 @@ -''' -KeyAuth.cc Python Example +import os +import json as jsond # json +import time # sleep before exit +import binascii # hex encoding +from uuid import uuid4 # gen random guid +import platform # check platform +import subprocess # needed for mac device + +try: + if os.name == 'nt': + import win32security # get sid (WIN only) + import requests # https requests + from Crypto.Cipher import AES + from Crypto.Hash import SHA256 + from Crypto.Util.Padding import pad, unpad +except ModuleNotFoundError: + print("Exception when importing modules") + print("Installing necessary modules....") + if os.path.isfile("requirements.txt"): + os.system("pip install -r requirements.txt") + else: + os.system("pip install pywin32") + os.system("pip install pycryptodome") + os.system("pip install requests") + print("Modules installed!") + time.sleep(1.5) + os._exit(1) + +try: # Connection check + s = requests.Session() # Session + s.get('https://google.com') +except requests.exceptions.RequestException as e: + print(e) + time.sleep(3) + os._exit(1) + + +class api: + + name = ownerid = secret = version = hash_to_check = "" + + def __init__(self, name, ownerid, secret, version, hash_to_check): + self.name = name + + self.ownerid = ownerid + + self.secret = secret + + self.version = version + self.hash_to_check = hash_to_check + self.init() + + sessionid = enckey = "" + initialized = False + + def init(self): + + if self.sessionid != "": + print("You've already initialized!") + time.sleep(3) + os._exit(1) + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + self.enckey = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("init".encode()), + "ver": encryption.encrypt(self.version, self.secret, init_iv), + "hash": self.hash_to_check, + "enckey": encryption.encrypt(self.enckey, self.secret, init_iv), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + + if response == "KeyAuth_Invalid": + print("The application doesn't exist") + time.sleep(3) + os._exit(1) + + response = encryption.decrypt(response, self.secret, init_iv) + json = jsond.loads(response) + + if json["message"] == "invalidver": + if json["download"] != "": + print("New Version Available") + download_link = json["download"] + os.system(f"start {download_link}") + time.sleep(3) + os._exit(1) + else: + print("Invalid Version, Contact owner to add download link to latest app version") + time.sleep(3) + os._exit(1) + + if not json["success"]: + print(json["message"]) + time.sleep(3) + os._exit(1) + + self.sessionid = json["sessionid"] + self.initialized = True + self.__load_app_data(json["appinfo"]) + + def register(self, user, password, license, hwid=None): + self.checkinit() + if hwid is None: + hwid = others.get_hwid() + + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("register".encode()), + "username": encryption.encrypt(user, self.enckey, init_iv), + "pass": encryption.encrypt(password, self.enckey, init_iv), + "key": encryption.encrypt(license, self.enckey, init_iv), + "hwid": encryption.encrypt(hwid, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + + if json["success"]: + print("successfully registered") + self.__load_user_data(json["info"]) + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def upgrade(self, user, license): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("upgrade".encode()), + "username": encryption.encrypt(user, self.enckey, init_iv), + "key": encryption.encrypt(license, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if json["success"]: + print("Successfully upgraded user") + print("Please restart program and login") + time.sleep(3) + os._exit(1) + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def login(self, user, password, hwid=None): + self.checkinit() + if hwid is None: + hwid = others.get_hwid() + + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("login".encode()), + "username": encryption.encrypt(user, self.enckey, init_iv), + "pass": encryption.encrypt(password, self.enckey, init_iv), + "hwid": encryption.encrypt(hwid, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if json["success"]: + self.__load_user_data(json["info"]) + print("Successfully logged in") + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def license(self, key, hwid=None): + self.checkinit() + if hwid is None: + hwid = others.get_hwid() + + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("license".encode()), + "key": encryption.encrypt(key, self.enckey, init_iv), + "hwid": encryption.encrypt(hwid, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if json["success"]: + self.__load_user_data(json["info"]) + print("Successfully logged in with license") + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def var(self, name): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("var".encode()), + "varid": encryption.encrypt(name, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } -Go to https://keyauth.cc/app/ and click the Python tab. Copy that code and replace the existing keyauthapp instance in this file. + response = self.__do_request(post_data) + + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if json["success"]: + return json["message"] + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def getvar(self, var_name): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("getvar".encode()), + "var": encryption.encrypt(var_name, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + + if json["success"]: + return json["response"] + else: + print(f"NOTE: This is commonly misunderstood. This is for user variables, not the normal variables.\nUse keyauthapp.var(\"{var_name}\") for normal variables"); + print(json["message"]) + time.sleep(3) + os._exit(1) + + def setvar(self, var_name, var_data): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + post_data = { + "type": binascii.hexlify("setvar".encode()), + "var": encryption.encrypt(var_name, self.enckey, init_iv), + "data": encryption.encrypt(var_data, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + + if json["success"]: + return True + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def ban(self): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + post_data = { + "type": binascii.hexlify("ban".encode()), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + + if json["success"]: + return True + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def file(self, fileid): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("file".encode()), + "fileid": encryption.encrypt(fileid, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if not json["success"]: + print(json["message"]) + time.sleep(3) + os._exit(1) + return binascii.unhexlify(json["contents"]) + + def webhook(self, webid, param, body = "", conttype = ""): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("webhook".encode()), + "webid": encryption.encrypt(webid, self.enckey, init_iv), + "params": encryption.encrypt(param, self.enckey, init_iv), + "body": encryption.encrypt(body, self.enckey, init_iv), + "conttype": encryption.encrypt(conttype, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + + if json["success"]: + return json["message"] + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def check(self): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + post_data = { + "type": binascii.hexlify("check".encode()), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + response = self.__do_request(post_data) + + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + if json["success"]: + return True + else: + return False + + def checkblacklist(self): + self.checkinit() + hwid = others.get_hwid() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + post_data = { + "type": binascii.hexlify("checkblacklist".encode()), + "hwid": encryption.encrypt(hwid, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + response = self.__do_request(post_data) + + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + if json["success"]: + return True + else: + return False + + def log(self, message): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("log".encode()), + "pcuser": encryption.encrypt(os.getenv('username'), self.enckey, init_iv), + "message": encryption.encrypt(message, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + self.__do_request(post_data) + + def fetchOnline(self): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("fetchOnline".encode()), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if json["success"]: + if len(json["users"]) == 0: + return None # THIS IS ISSUE ON KEYAUTH SERVER SIDE 6.8.2022, so it will return none if it is not an array. + else: + return json["users"] + else: + return None + + def chatGet(self, channel): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("chatget".encode()), + "channel": encryption.encrypt(channel, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if json["success"]: + return json["messages"] + else: + return None + + def chatSend(self, message, channel): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + + post_data = { + "type": binascii.hexlify("chatsend".encode()), + "message": encryption.encrypt(message, self.enckey, init_iv), + "channel": encryption.encrypt(channel, self.enckey, init_iv), + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + + json = jsond.loads(response) + + if json["success"]: + return True + else: + return False + + def checkinit(self): + if not self.initialized: + print("Initialize first, in order to use the functions") + time.sleep(3) + os._exit(1) + + def changeUsername(self, username): + self.checkinit() + init_iv = SHA256.new(str(uuid4())[:8].encode()).hexdigest() + post_data = { + "type": binascii.hexlify("changeUsername".encode()), + "newUsername": username, + "sessionid": binascii.hexlify(self.sessionid.encode()), + "name": binascii.hexlify(self.name.encode()), + "ownerid": binascii.hexlify(self.ownerid.encode()), + "init_iv": init_iv + } + + response = self.__do_request(post_data) + response = encryption.decrypt(response, self.enckey, init_iv) + json = jsond.loads(response) + + if json["success"]: + print("Successfully changed username") + else: + print(json["message"]) + time.sleep(3) + os._exit(1) + + def __do_request(self, post_data): + try: + rq_out = s.post( + "https://keyauth.win/api/1.0/", data=post_data, timeout=30 + ) + return rq_out.text + except requests.exceptions.Timeout: + print("Request timed out") + + class application_data_class: + numUsers = numKeys = app_ver = customer_panel = onlineUsers = "" + # region user_data + + class user_data_class: + username = ip = hwid = expires = createdate = lastlogin = subscription = subscriptions = "" + + user_data = user_data_class() + app_data = application_data_class() + + def __load_app_data(self, data): + self.app_data.numUsers = data["numUsers"] + self.app_data.numKeys = data["numKeys"] + self.app_data.app_ver = data["version"] + self.app_data.customer_panel = data["customerPanelLink"] + self.app_data.onlineUsers = data["numOnlineUsers"] + + def __load_user_data(self, data): + self.user_data.username = data["username"] + self.user_data.ip = data["ip"] + self.user_data.hwid = data["hwid"] or "N/A" + self.user_data.expires = data["subscriptions"][0]["expiry"] + self.user_data.createdate = data["createdate"] + self.user_data.lastlogin = data["lastlogin"] + self.user_data.subscription = data["subscriptions"][0]["subscription"] + self.user_data.subscriptions = data["subscriptions"] + + +class others: + @staticmethod + def get_hwid(): + if platform.system() == "Linux": + with open("/etc/machine-id") as f: + hwid = f.read() + return hwid + elif platform.system() == 'Windows': + winuser = os.getlogin() + sid = win32security.LookupAccountName(None, winuser)[0] # You can also use WMIC (better than SID, some users had problems with WMIC) + hwid = win32security.ConvertSidToStringSid(sid) + return hwid + elif platform.system() == 'Darwin': + output = subprocess.Popen("ioreg -l | grep IOPlatformSerialNumber", stdout=subprocess.PIPE, shell=True).communicate()[0] + serial = output.decode().split('=', 1)[1].replace(' ', '') + hwid = serial[1:-2] + return hwid + + + +class encryption: + @staticmethod + def encrypt_string(plain_text, key, iv): + plain_text = pad(plain_text, 16) + + aes_instance = AES.new(key, AES.MODE_CBC, iv) + + raw_out = aes_instance.encrypt(plain_text) + + return binascii.hexlify(raw_out) + + @staticmethod + def decrypt_string(cipher_text, key, iv): + cipher_text = binascii.unhexlify(cipher_text) + + aes_instance = AES.new(key, AES.MODE_CBC, iv) + + cipher_text = aes_instance.decrypt(cipher_text) + + return unpad(cipher_text, 16) + + @staticmethod + def encrypt(message, enc_key, iv): + try: + _key = SHA256.new(enc_key.encode()).hexdigest()[:32] + + _iv = SHA256.new(iv.encode()).hexdigest()[:16] + + return encryption.encrypt_string(message.encode(), _key.encode(), _iv.encode()).decode() + except: + print("Encryption error. Make sure your app details are correct, see response below") + print("Response: " + message) + time.sleep(3) + os._exit(1) + + @staticmethod + def decrypt(message, enc_key, iv): + try: + _key = SHA256.new(enc_key.encode()).hexdigest()[:32] -If you get an error saying it can't find module KeyAuth, try following this https://github.com/KeyAuth/KeyAuth-Python-Example#how-to-compile + _iv = SHA256.new(iv.encode()).hexdigest()[:16] -If that doesn't work for you, you can paste the contents of KeyAuth.py ABOVE this comment and then remove the "from keyauth import api" and that should work too. -''' -from keyauth import api + return encryption.decrypt_string(message.encode(), _key.encode(), _iv.encode()).decode() + except: + print("Encryption error. Make sure your app details are correct, see response below") + print("Response: " + message) + time.sleep(3) + os._exit(1) import sys import time