ddns_script/porkbun_ddns.py
2022-11-20 22:05:49 +08:00

213 lines
8.8 KiB
Python

import requests
class PorkbunAPI:
def __init__(self, domain: "str", api_key: "str", secret_api_key: "str"):
self.domain = domain
self.__secret_api_key = secret_api_key
self.__api_key = api_key
self.__records = {}
@staticmethod
def ping_porkbun(self, verbose: "bool" = True):
ping_URI = "https://porkbun.com/api/json/v3/ping"
ping_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key
}
response = requests.post(ping_URI, json=ping_json)
if verbose:
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Internet Success! Your IP is ", response.json()["yourIP"])
else:
print("response code is ", response.status_code)
print(response.json())
return response.status_code
def retrieve(self, verbose: "bool" = True):
# 0 is DOMAIN
retrieve_URI = "https://porkbun.com/api/json/v3/dns/retrieve/{0}".format(self.domain)
retrieve_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key
}
response = requests.post(retrieve_URI, json=retrieve_json)
if response.status_code == 404:
if verbose:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
records = response.json()["records"]
for record in records:
self.__records[record["id"]] = record
if verbose:
print(("{} " * 6).format("id".center(9), "name".center(48), "type".center(5), "content".center(40),
"ttl".center(5), "prio".center(4)))
for record in records:
print(("{} " * 6).format(str(record["id"]).center(9),
str(record["name"][:-len(self.domain) - 1]).center(48),
str(record["type"]).center(5),
str(record["content"][:40]).center(40) if len(
record["content"]) > 40 else str(record["content"]).center(40),
str(record["ttl"]).center(5), str(record["prio"]).center(4)))
else:
if verbose:
print("response code is ", response.status_code)
print(response.json())
return response.status_code
def find_record(self, _type: "str" = None, name: "str" = None, content: "str" = None, ttl: "int" = None, verbose: "bool" = True):
records = {}
def filter_fun(x):
if _type is not None and self.__records[x]["type"] != _type:
return False
if name is not None and self.__records[x]["name"][:-len(self.domain) - 1] != name:
return False
if content is not None and self.__records[x]["content"] != content:
return False
if ttl is not None and int(self.__records[x]["ttl"]) != ttl:
return False
return True
for i in filter(filter_fun, self.__records):
records[i] = self.__records[i]
if verbose:
print(("{} " * 6).format("id".center(9), "name".center(48),
"type".center(5), "content".center(40),
"ttl".center(5), "prio".center(4)))
for record in records:
print(("{} " * 6).format(str(records[record]["id"]).center(9),
str(records[record]["name"][:-len(self.domain) - 1]).center(48),
str(records[record]["type"]).center(5),
str(records[record]["content"][:40]).center(40) if len(
records[record]["content"]) > 40 else str(
records[record]["content"]).center(40),
str(records[record]["ttl"]).center(5),
str(records[record]["prio"]).center(4)))
return records
def create(self, name: "str", _type: "str" = "A", content: "str" = "1.1.1.1", ttl: "int >= 600" = 600, prio=None):
# 0 is DOMAIN
create_URI = "https://porkbun.com/api/json/v3/dns/create/{0}".format(self.domain)
create_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key,
"name": name,
"type": _type,
"content": content,
"ttl": ttl
}
if prio is not None:
create_json["prio"] = prio
print({"name": name,
"type": _type,
"content": content
})
response = requests.post(create_URI, json=create_json)
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Create New Record Success! Your new record is ", response.json()["id"])
else:
print("response code is ", response.status_code)
print(response.json())
def edit(self, name: "str", _id: "str", _type: "str" = "A", content: "str" = "1.1.1.1", ttl: "int >= 600" = 600,
prio=None, verbose: "bool" = True):
# 0 is DOMAIN 1 is ID
edit_URI = "https://porkbun.com/api/json/v3/dns/edit/{0}/{1}".format(self.domain, _id)
edit_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key,
"name": name,
"type": _type,
"content": content,
"ttl": ttl
}
if prio is not None:
edit_json["prio"] = prio
if verbose:
print({"name": name,
"type": _type,
"content": content
})
response = requests.post(edit_URI, json=edit_json)
if not verbose:
return
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Edit Success!")
else:
print("response code is ", response.status_code)
print(response.json())
def delete(self, _id: "str", verbose: "bool" = True):
# 0 is DOMAIN 1 is ID
delete_URI = "https://porkbun.com/api/json/v3/dns/delete/{0}/{1}".format(self.domain, _id)
delete_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key
}
# print(name + "." + self.domain)
response = requests.post(delete_URI, json=delete_json)
if not verbose:
return
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Delete Record Success!")
else:
print("response code is ", response.status_code)
print(response.json())
def edit_A_with_name(self, name: "str", content: "str" = "1.1.1.1", ttl: "int" = None):
self.retrieve(verbose=False)
result = self.find_record(_type="A", name=name, ttl=ttl, verbose=False)
if len(result) != 1:
print("Multiple records are selected")
self.find_record(_type="A", name=name)
else:
_id = [i for i in result.keys()][0]
old_content = result[_id]["content"]
if old_content != content:
self.edit(name=name, content=content, _type="A", _id=_id)
def edit_AAAA_with_name(self, name: "str", content: "str" = "1.1.1.1"):
self.retrieve(verbose=False)
result = self.find_record(_type="AAAA", name=name, verbose=False)
if len(result) != 1:
print("Multiple records are selected")
self.find_record(_type="AAAA", name=name)
else:
_id = [i for i in result.keys()][0]
old_content = result[_id]["content"]
if old_content != content:
self.edit(name=name, content=content, _type="AAAA", _id=_id)
def delete_A_with_name(self, name: "str"):
self.retrieve(verbose=False)
result = self.find_record(_type="A", name=name, verbose=False)
if len(result) != 1:
print("Multiple records are selected")
self.find_record(_type="A", name=name)
else:
_id = [i for i in result.keys()][0]
self.delete(_id=_id)