first commit

This commit is contained in:
marques 2022-04-23 21:18:04 +08:00
commit c54da26306
11 changed files with 496 additions and 0 deletions

86
README.MD Normal file
View File

@ -0,0 +1,86 @@
# 文件结构
ddns_win.py windows106上使用的ddns程序主要执行的脚本判断从dns解析的结果与当前ip是否一致不一致则修改DNS记录
ddns_801.py ubuntu186上使用的ddns程序主要执行的脚本判断从dns解析的结果与当前ip是否一致不一致则修改DNS记录
getLocalIP 通过socket库获取本机IP适用于本机PPPOE拨号的情况
getRouterIP 通过爬虫脚本获取路由器WAN口IP只是用WIS两台路由器
router_info.ini 存放路由器的配置信息
# DDNS动态域名解析脚本
## 域名服务商 与 API接口
安利一个域名服务商porkbun
API接口文档https://porkbun.com/api/json/v3/documentation
通过查阅API接口了解如何通过接口修改域名记录
porkbun是通过json向接口提交参数完成操作
## 获取目标IP
### 1. 获取路由器IP
目标获取路由器WAN口IP而非公网出口IP
方法:通过爬虫获取路由器系统信息
环境:必联路由(其实就是极路由系统换皮)
具体实现:
通过浏览器F12开发者工具可以获取以下信息
1. 每次正确输入密码验证后路由器通过一个密钥stok来验证用户的登录状态因此需要抓取输入密码后返回的stok保持后续操作的可行性
抓取后可以看出在提交密码时浏览器用GET方式向http://192.168.199.1/cgi-bin/turbo/api/login/login_admin传递参数username和password
其中192.168.199.1时路由器的管理地址admin时默认的username这个不用修改
从路由器返回结果可以看到直接返回了stok所以可以直接使用理论上可以直接按开发者工具response看到但是没看到所以下面用python获取返回的结果。
<img src="H:\pycharm\ddns_script\README\img\fALmA9X.png" align="center">
<img src="H:\pycharm\ddns_script\README\img\uVaBflU.png" align="center" >
2. 继续观察浏览器network中的数据包可以发现浏览器通过POST方法向 http://{IP}/cgi-bin/turbo{STOK}/proxy/call 接口传递一个Form data模板来获取路由器信息。
<img src="H:\pycharm\ddns_script\README\img\oX3tOdh.png" align="center" >
3. 分析POST结果可以得出只要按照浏览器的格式POST该模板就可以获取路由器地址剔除无关信息后返回结果如下
<img src="H:\pycharm\ddns_script\README\img\YbpRaBc.png" align="center" height="80%" width="80%" >
### 2. 获取本地IP
直接通过python的socket库就可以获取到
refer
1. [Python 获取本机公网IPv6地址](https://coco56.blog.csdn.net/article/details/106725406)
2. [python获取本机IP地址](https://www.cnblogs.com/z-x-y/p/9529930.html)

BIN
README/img/YbpRaBc.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

BIN
README/img/fALmA9X.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

BIN
README/img/oX3tOdh.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

BIN
README/img/uVaBflU.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

31
ddns_801.py Normal file
View File

@ -0,0 +1,31 @@
from porkbun_ddns import PorkbunAPI
from getRouterIP import GetRouterWanIP
import re
from dns.resolver import resolve
API_Key = "pk1_我删了"
Secret_Key = "sk1_我删了"
def check_ip(ip_addr):
compile_ip = re.compile(
'^(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|[1-9])\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)$')
if compile_ip.match(ip_addr):
return True
else:
return False
GetRouterWanIP.check_info_exist()
getRouterWanIP = GetRouterWanIP()
getRouterWanIP.get_stok()
ip = getRouterWanIP.get_wan_ip()
print(ip)
if check_ip(ip):
if str(resolve("wis01.marques22.com", 'A')[0]) != ip:
porkbun_api = PorkbunAPI("marques22.com", API_Key, Secret_Key)
porkbun_api.edit_A_with_name("wis01", ip)
print("edit end")
else:
print("ip not change")
else:
print("ipv4 formation is wrong!")

30
ddns_win.py Normal file
View File

@ -0,0 +1,30 @@
from porkbun_ddns import PorkbunAPI
from getRouterIP import GetRouterWanIP
import re
from dns.resolver import resolve
API_Key = "我删了"
Secret_Key = "我删了"
def check_ip(ip_addr):
compile_ip = re.compile(
'^(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|[1-9])\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)$')
if compile_ip.match(ip_addr):
return True
else:
return False
GetRouterWanIP.check_info_exist()
getRouterWanIP = GetRouterWanIP()
getRouterWanIP.get_stok()
ip = getRouterWanIP.get_wan_ip()
if check_ip(ip):
if str(resolve("wis02.marques22.com", 'A')[0]) != ip:
porkbun_api = PorkbunAPI("marques22.com", API_Key, Secret_Key)
porkbun_api.edit_A_with_name("wis02", ip)
else:
print("ip not change")
else:
print("ipv4 formation is wrong!")

32
getLocal_IP.py Normal file
View File

@ -0,0 +1,32 @@
import socket
import os
import re
def get_local_ip():
try:
s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.connect(('8.8.8.8',80))
ip=s.getsockname()[0]
finally:
s.close()
print(ip)
return ip
def get_local_ipv6():
output = os.popen("ipconfig /all").read()
# print(output)
result = re.findall(r"(([a-f0-9]{1,4}:){7}[a-f0-9]{1,4})", output, re.I)
ip = result[0][0]
print(ip)
return ip
if __name__ == '__main__':
get_local_ip()
get_local_ipv6()

101
getRouterIP.py Normal file
View File

@ -0,0 +1,101 @@
import configparser
import requests
import sys
import os.path as osp
class GetRouterWanIP:
def __init__(self):
self.ip = None
self.username = None
self.password = None
self.stok = None
self.remain_time = None
self.cookie = None
self.error_time = None
self.get_router_info()
@staticmethod
def check_info_exist():
if not osp.exists("/home/marques/software/ddns/router_info.ini"):
print("Error: Router information config not exist!")
print("Info: Please fill the configuration file ./router_info.ini")
info_config = configparser.ConfigParser()
info_config["DEFAULT"] = {"ip": "", "username": "", "password": "", "error_time": 0}
with open("/home/marques/software/ddns/router_info.ini", "w") as file:
info_config.write(file)
def get_router_info(self):
router_info_config = configparser.ConfigParser()
router_info_config.read("/home/marques/software/ddns/router_info.ini", encoding="utf-8")
self.ip = router_info_config["DEFAULT"]["ip"]
self.username = router_info_config["DEFAULT"]["username"]
self.password = router_info_config["DEFAULT"]["password"]
self.error_time = int(router_info_config["DEFAULT"]["error_time"])
if self.error_time > 0:
print("please check your password and manually reset error_time's value to 0 in the router_info_ini")
sys.exit(0)
def get_stok(self):
# 获取stok
get_stok_url = 'http://{}/cgi-bin/turbo/admin_web/login_admin?username={}&password={}'.format(self.ip,
self.username,
self.password)
stok_response = requests.get(get_stok_url)
# print(stok_response.json())
# self.stok = stok_response.json()["stok"]
self.remain_time = stok_response.json()["remaining_num"]
# print(self.remain_time)
if self.remain_time != 10:
router_info_config = configparser.ConfigParser()
router_info_config.read("./router_info.ini", encoding="utf-8")
router_info_config["DEFAULT"]["error_time"] = str(10-self.remain_time)
with open("./router_info.ini", "w") as file:
router_info_config.write(file)
print("please check your password and manually reset error_time's value to 0 in the router_info_ini")
sys.exit(0)
self.stok = stok_response.json()["stok"]
self.cookie = requests.utils.dict_from_cookiejar(stok_response.cookies)
def get_wan_ip(self):
wan_ip_url = "http://{}/cgi-bin/turbo{}/proxy/call".format(self.ip, self.stok)
params = {"_roundRobinUpdateD": ""}
headers = {
"Host": self.ip,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/json",
"X-Requested-With": "XMLHttpRequest",
"X-KL-Ajax-Request": "Ajax_Request",
"Origin": "http://{}".format(self.ip),
"Connection": "keep-alive",
"Referer": "http://{}/cgi-bin/turbo{}/admin_web".format(self.ip, self.stok),
"Cookie": "sysauth={}".format(self.cookie['sysauth'])
}
post_json = {"muticall": "1", "mutiargs": [{"method": "wan.get_status", "data": {}}], "lang": "zh-CN",
"version": "v1"}
wan_ip_response = requests.post(wan_ip_url, headers=headers, params=params, json=post_json)
wan_ip = wan_ip_response.json()["data"]["results"][0]["result"]['data']["wan_ip"]
#with open("./wan_ip.txt", "w") as file:
# file.write(wan_ip)
print(wan_ip)
return wan_ip
if __name__ == '__main__':
GetRouterWanIP.check_info_exist()
getRouterWanIP = GetRouterWanIP()
getRouterWanIP.get_stok()
getRouterWanIP.get_wan_ip()

210
porkbun_ddns.py Normal file
View File

@ -0,0 +1,210 @@
import requests
class PorkbunAPI:
def __init__(self, domain: "str", api_key: "str", secret_api_key: "str"):
self.domain = domain
self.__secret_api_key = secret_api_key
self.__api_key = api_key
self.__records = {}
@staticmethod
def ping_porkbun(self, verbose: "bool" = True):
ping_URI = "https://porkbun.com/api/json/v3/ping"
ping_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key
}
response = requests.post(ping_URI, json=ping_json)
if verbose:
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Internet Success! Your IP is ", response.json()["yourIP"])
else:
print("response code is ", response.status_code)
print(response.json())
return response.status_code
def retrieve(self, verbose: "bool" = True):
# 0 is DOMAIN
retrieve_URI = "https://porkbun.com/api/json/v3/dns/retrieve/{0}".format(self.domain)
retrieve_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key
}
response = requests.post(retrieve_URI, json=retrieve_json)
if response.status_code == 404:
if verbose:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
records = response.json()["records"]
for record in records:
self.__records[record["id"]] = record
if verbose:
print(("{} " * 6).format("id".center(9), "name".center(48), "type".center(5), "content".center(40),
"ttl".center(5), "prio".center(4)))
for record in records:
print(("{} " * 6).format(str(record["id"]).center(9),
str(record["name"][:-len(self.domain) - 1]).center(48),
str(record["type"]).center(5),
str(record["content"][:40]).center(40) if len(
record["content"]) > 40 else str(record["content"]).center(40),
str(record["ttl"]).center(5), str(record["prio"]).center(4)))
else:
if verbose:
print("response code is ", response.status_code)
print(response.json())
return response.status_code
def find_record(self, _type: "str" = None, name: "str" = None, content: "str" = None, verbose: "bool" = True):
records = {}
def filter_fun(x):
if _type is not None and self.__records[x]["type"] != _type:
return False
if name is not None and self.__records[x]["name"][:-len(self.domain) - 1] != name:
return False
if content is not None and self.__records[x]["content"] != content:
return False
return True
for i in filter(filter_fun, self.__records):
records[i] = self.__records[i]
if verbose:
print(("{} " * 6).format("id".center(9), "name".center(48),
"type".center(5), "content".center(40),
"ttl".center(5), "prio".center(4)))
for record in records:
print(("{} " * 6).format(str(records[record]["id"]).center(9),
str(records[record]["name"][:-len(self.domain) - 1]).center(48),
str(records[record]["type"]).center(5),
str(records[record]["content"][:40]).center(40) if len(
records[record]["content"]) > 40 else str(
records[record]["content"]).center(40),
str(records[record]["ttl"]).center(5),
str(records[record]["prio"]).center(4)))
return records
def create(self, name: "str", _type: "str" = "A", content: "str" = "1.1.1.1", ttl: "int >= 600" = 600, prio=None):
# 0 is DOMAIN
create_URI = "https://porkbun.com/api/json/v3/dns/create/{0}".format(self.domain)
create_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key,
"name": name,
"type": _type,
"content": content,
"ttl": ttl
}
if prio is not None:
create_json["prio"] = prio
print({"name": name,
"type": _type,
"content": content
})
response = requests.post(create_URI, json=create_json)
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Create New Record Success! Your new record is ", response.json()["id"])
else:
print("response code is ", response.status_code)
print(response.json())
def edit(self, name: "str", _id: "str", _type: "str" = "A", content: "str" = "1.1.1.1", ttl: "int >= 600" = 600,
prio=None, verbose: "bool" = True):
# 0 is DOMAIN 1 is ID
edit_URI = "https://porkbun.com/api/json/v3/dns/edit/{0}/{1}".format(self.domain, _id)
edit_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key,
"name": name,
"type": _type,
"content": content,
"ttl": ttl
}
if prio is not None:
edit_json["prio"] = prio
if verbose:
print({"name": name,
"type": _type,
"content": content
})
response = requests.post(edit_URI, json=edit_json)
if not verbose:
return
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Edit Success!")
else:
print("response code is ", response.status_code)
print(response.json())
def delete(self, _id: "str", verbose: "bool" = True):
# 0 is DOMAIN 1 is ID
delete_URI = "https://porkbun.com/api/json/v3/dns/delete/{0}/{1}".format(self.domain, _id)
delete_json = {
"secretapikey": self.__secret_api_key,
"apikey": self.__api_key
}
# print(name + "." + self.domain)
response = requests.post(delete_URI, json=delete_json)
if not verbose:
return
if response.status_code == 404:
print("Internet Failure !")
elif response.status_code == 200 and response.json()["status"] == "SUCCESS":
print("Delete Record Success!")
else:
print("response code is ", response.status_code)
print(response.json())
def edit_A_with_name(self, name: "str", content: "str" = "1.1.1.1"):
self.retrieve(verbose=False)
result = self.find_record(_type="A", name=name, verbose=False)
if len(result) != 1:
print("Multiple records are selected")
self.find_record(_type="A", name=name)
else:
_id = [i for i in result.keys()][0]
old_content = result[_id]["content"]
if old_content != content:
self.edit(name=name, content=content, _type="A", _id=_id)
def edit_AAAA_with_name(self, name: "str", content: "str" = "1.1.1.1"):
self.retrieve(verbose=False)
result = self.find_record(_type="AAAA", name=name, verbose=False)
if len(result) != 1:
print("Multiple records are selected")
self.find_record(_type="AAAA", name=name)
else:
_id = [i for i in result.keys()][0]
old_content = result[_id]["content"]
if old_content != content:
self.edit(name=name, content=content, _type="AAAA", _id=_id)
def delete_A_with_name(self, name: "str"):
self.retrieve(verbose=False)
result = self.find_record(_type="A", name=name, verbose=False)
if len(result) != 1:
print("Multiple records are selected")
self.find_record(_type="A", name=name)
else:
_id = [i for i in result.keys()][0]
self.delete(_id=_id)

6
router_info.ini Normal file
View File

@ -0,0 +1,6 @@
[DEFAULT]
ip = 192.168.1.1
username = admin
password = DX418418
error_time = 1