Python+Tkinter编写一个批量IP地址归属地查询
作者:Atlas Shepherd
这篇文章主要为大家详细介绍了Python如何结合Tkinter编写一个批量IP地址归属地查询,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
这是一个基于Python Tkinter的批量IP地址归属地查询工具。让我详细解读代码的各个部分:
核心功能
批量查询IP地址的地理位置信息
支持多个备用API端点
具有重试机制和请求间隔控制
图形化用户界面
效果图

代码结构分析
1. 类定义和初始化
class IPQueryTool:
def __init__(self, root):
# 设置3个备用API接口
self.apis = [
"https://ipapi.co/{}/json/",
"https://ipapi.com/ip_api.php?ip={}",
"http://ip-api.com/json/{}"
]
self.queue = Queue() # 线程安全的消息队列2. GUI界面构建 (setup_ui)
输入区域: 多行文本框用于输入IP列表
配置区域: 可调整请求间隔时间
按钮区域: 开始查询和清空结果按钮
结果显示区: 带滚动条的文本框
状态栏: 显示当前操作状态
3. 核心查询流程
多线程处理
def start_query_thread(self):
# 在新线程中执行查询,避免界面卡顿
thread = threading.Thread(target=self.query_ip_addresses)IP验证和批量处理
def query_ip_addresses(self):
# 1. 获取并验证IP地址
# 2. 逐个查询,包含延时控制
# 3. 统计成功/失败数量带重试机制的单个查询
def query_single_ip_with_retry(self, ip):
# 最大重试3次
# 遇到限流(429状态码)时使用指数退避算法
# 失败时自动切换API接口4. API响应处理适配
支持不同API返回格式的解析:
format_ipapi_result(): 处理ipapi.co格式format_ip-api_result(): 处理ip-api.com格式format_generic_result(): 通用格式处理
5. 线程安全的消息传递
def process_queue(self):
# 主线程定时检查队列,安全更新UI
# 处理不同类型消息:结果、状态、错误等关键技术特点
健壮性设计
多重API备用: 一个API失败自动切换下一个
请求限流控制: 可配置的请求间隔 + 随机延迟
指数退避重试: 2^retry秒的等待时间
错误处理: 网络超时、连接错误的完善处理
用户体验优化
实时状态更新: 显示当前查询进度
结果分格式显示: 清晰的结果排版
输入验证: IP格式自动校验
防界面卡顿: 多线程设计
使用示例
输入格式:
8.8.8.8
114.114.114.114
202.96.134.133
输出结果示例:
==================================================
IP地址: 8.8.8.8
国家: United States (US)
地区: California
城市: Mountain View
ISP: Google LLC
时区: America/Los_Angeles
经纬度: 37.4056, -122.0775
完整代码
代码一:
import tkinter as tk
from tkinter import scrolledtext, messagebox
import requests
import threading
from queue import Queue
import json
class IPQueryTool:
def __init__(self, root):
self.root = root
self.root.title("批量查询 IP 地址归属地")
self.root.geometry("600x700")
# 设置API端点
self.API_URL = "https://ipapi.co/{}/json/"
# 创建队列用于线程间通信
self.queue = Queue()
self.setup_ui()
# 定期检查队列
self.process_queue()
def setup_ui(self):
"""设置用户界面"""
# 输入区域
input_frame = tk.Frame(self.root)
input_frame.pack(pady=10, fill=tk.X, padx=10)
tk.Label(input_frame, text="请输入要查询的 IP 地址(每行一个):").pack(anchor=tk.W)
self.ip_entry = scrolledtext.ScrolledText(input_frame, height=8, width=70)
self.ip_entry.pack(pady=5)
# 按钮区域
button_frame = tk.Frame(self.root)
button_frame.pack(pady=10)
self.query_button = tk.Button(
button_frame,
text="开始查询",
command=self.start_query_thread,
bg="#4CAF50",
fg="white",
width=15
)
self.query_button.pack(side=tk.LEFT, padx=5)
clear_button = tk.Button(
button_frame,
text="清空结果",
command=self.clear_results,
bg="#f44336",
fg="white",
width=15
)
clear_button.pack(side=tk.LEFT, padx=5)
# 结果区域
result_frame = tk.Frame(self.root)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True, padx=10)
tk.Label(result_frame, text="查询结果:").pack(anchor=tk.W)
self.result_text = scrolledtext.ScrolledText(result_frame, height=20, width=70)
self.result_text.pack(pady=5, fill=tk.BOTH, expand=True)
# 状态栏
self.status_var = tk.StringVar()
self.status_var.set("就绪")
status_bar = tk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def start_query_thread(self):
"""在新线程中启动查询"""
self.query_button.config(state=tk.DISABLED)
self.status_var.set("查询中...")
thread = threading.Thread(target=self.query_ip_addresses)
thread.daemon = True
thread.start()
def query_ip_addresses(self):
"""查询IP地址信息"""
ip_text = self.ip_entry.get(1.0, tk.END).strip()
ip_list = [ip.strip() for ip in ip_text.splitlines() if ip.strip()]
if not ip_list:
self.queue.put(("error", "请输入至少一个IP地址"))
return
valid_ips = []
for ip in ip_list:
if self.is_valid_ip(ip):
valid_ips.append(ip)
else:
self.queue.put(("result", f"无效的IP地址格式: {ip}\n"))
if not valid_ips:
self.queue.put(("error", "没有有效的IP地址可查询"))
return
success_count = 0
total = len(valid_ips)
for i, ip in enumerate(valid_ips, 1):
try:
self.queue.put(("status", f"正在查询 {i}/{total}: {ip}"))
response = requests.get(self.API_URL.format(ip), timeout=10)
if response.status_code == 200:
data = response.json()
result = self.format_result(ip, data)
self.queue.put(("result", result))
success_count += 1
else:
error_msg = f"查询 IP {ip} 失败,状态码: {response.status_code}\n"
self.queue.put(("result", error_msg))
except requests.exceptions.Timeout:
self.queue.put(("result", f"查询 IP {ip} 超时\n"))
except requests.exceptions.ConnectionError:
self.queue.put(("result", f"网络连接错误,无法查询 IP {ip}\n"))
except Exception as e:
self.queue.put(("result", f"查询 IP {ip} 时出错: {str(e)}\n"))
# 查询完成
self.queue.put(("status", f"查询完成!成功: {success_count}/{total}"))
self.queue.put(("complete", ""))
def is_valid_ip(self, ip):
"""简单的IP地址格式验证"""
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
if not 0 <= int(part) <= 255:
return False
return True
def format_result(self, ip, data):
"""格式化查询结果"""
result = f"{'=' * 50}\n"
result += f"IP地址: {ip}\n"
result += f"国家: {data.get('country_name', 'N/A')} ({data.get('country', 'N/A')})\n"
result += f"地区: {data.get('region', 'N/A')}\n"
result += f"城市: {data.get('city', 'N/A')}\n"
result += f"ISP: {data.get('org', 'N/A')}\n"
result += f"时区: {data.get('timezone', 'N/A')}\n"
result += f"经纬度: {data.get('latitude', 'N/A')}, {data.get('longitude', 'N/A')}\n"
result += "\n"
return result
def process_queue(self):
"""处理队列中的消息(在主线程中执行)"""
try:
while True:
msg_type, content = self.queue.get_nowait()
if msg_type == "result":
self.result_text.insert(tk.END, content)
self.result_text.see(tk.END)
elif msg_type == "status":
self.status_var.set(content)
elif msg_type == "error":
messagebox.showerror("错误", content)
self.status_var.set("错误")
elif msg_type == "complete":
self.query_button.config(state=tk.NORMAL)
self.status_var.set("查询完成")
self.root.update_idletasks()
except:
pass
# 每100毫秒检查一次队列
self.root.after(100, self.process_queue)
def clear_results(self):
"""清空结果文本框"""
self.result_text.delete(1.0, tk.END)
def main():
root = tk.Tk()
app = IPQueryTool(root)
root.mainloop()
if __name__ == "__main__":
main()代码二
解读以下代码:
import tkinter as tk
from tkinter import scrolledtext, messagebox
import requests
import threading
from queue import Queue
import time
import random
class IPQueryTool:
def __init__(self, root):
self.root = root
self.root.title("批量查询 IP 地址归属地")
self.root.geometry("600x700")
# 设置多个API端点作为备用
self.apis = [
"https://ipapi.co/{}/json/",
"https://ipapi.com/ip_api.php?ip={}",
"http://ip-api.com/json/{}"
]
self.current_api_index = 0
# 请求配置
self.delay_between_requests = 1.0 # 请求间隔(秒)
self.max_retries = 3 # 最大重试次数
self.queue = Queue()
self.setup_ui()
self.process_queue()
def setup_ui(self):
"""设置用户界面"""
# 输入区域
input_frame = tk.Frame(self.root)
input_frame.pack(pady=10, fill=tk.X, padx=10)
tk.Label(input_frame, text="请输入要查询的 IP 地址(每行一个):").pack(anchor=tk.W)
self.ip_entry = scrolledtext.ScrolledText(input_frame, height=8, width=70)
self.ip_entry.pack(pady=5)
# 配置区域
config_frame = tk.Frame(self.root)
config_frame.pack(pady=5, fill=tk.X, padx=10)
tk.Label(config_frame, text="请求间隔(秒):").pack(side=tk.LEFT)
self.delay_var = tk.StringVar(value="1.0")
delay_entry = tk.Entry(config_frame, textvariable=self.delay_var, width=5)
delay_entry.pack(side=tk.LEFT, padx=5)
# 按钮区域
button_frame = tk.Frame(self.root)
button_frame.pack(pady=10)
self.query_button = tk.Button(
button_frame,
text="开始查询",
command=self.start_query_thread,
bg="#4CAF50",
fg="white",
width=15
)
self.query_button.pack(side=tk.LEFT, padx=5)
clear_button = tk.Button(
button_frame,
text="清空结果",
command=self.clear_results,
bg="#f44336",
fg="white",
width=15
)
clear_button.pack(side=tk.LEFT, padx=5)
# 结果区域
result_frame = tk.Frame(self.root)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True, padx=10)
tk.Label(result_frame, text="查询结果:").pack(anchor=tk.W)
self.result_text = scrolledtext.ScrolledText(result_frame, height=20, width=70)
self.result_text.pack(pady=5, fill=tk.BOTH, expand=True)
# 状态栏
self.status_var = tk.StringVar()
self.status_var.set("就绪")
status_bar = tk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def get_current_api(self):
"""获取当前API端点"""
return self.apis[self.current_api_index]
def switch_to_next_api(self):
"""切换到下一个API端点"""
self.current_api_index = (self.current_api_index + 1) % len(self.apis)
return self.get_current_api()
def start_query_thread(self):
"""在新线程中启动查询"""
try:
self.delay_between_requests = float(self.delay_var.get())
if self.delay_between_requests < 0.5:
messagebox.showwarning("警告", "请求间隔太短可能导致限流,建议至少0.5秒")
self.delay_between_requests = 0.5
self.delay_var.set("0.5")
except ValueError:
messagebox.showerror("错误", "请输入有效的数字")
return
self.query_button.config(state=tk.DISABLED)
self.status_var.set("查询中...")
thread = threading.Thread(target=self.query_ip_addresses)
thread.daemon = True
thread.start()
def query_ip_addresses(self):
"""查询IP地址信息"""
ip_text = self.ip_entry.get(1.0, tk.END).strip()
ip_list = [ip.strip() for ip in ip_text.splitlines() if ip.strip()]
if not ip_list:
self.queue.put(("error", "请输入至少一个IP地址"))
return
valid_ips = []
for ip in ip_list:
if self.is_valid_ip(ip):
valid_ips.append(ip)
else:
self.queue.put(("result", f"无效的IP地址格式: {ip}\n"))
if not valid_ips:
self.queue.put(("error", "没有有效的IP地址可查询"))
return
success_count = 0
total = len(valid_ips)
api_switched = False
for i, ip in enumerate(valid_ips, 1):
# 添加请求间隔
if i > 1:
time.sleep(self.delay_between_requests + random.uniform(0, 0.5))
try:
self.queue.put(("status", f"正在查询 {i}/{total}: {ip}"))
result = self.query_single_ip_with_retry(ip)
if result:
self.queue.put(("result", result))
success_count += 1
else:
self.queue.put(("result", f"查询 IP {ip} 失败\n"))
except Exception as e:
self.queue.put(("result", f"查询 IP {ip} 时出错: {str(e)}\n"))
# 查询完成
self.queue.put(("status", f"查询完成!成功: {success_count}/{total}"))
self.queue.put(("complete", ""))
def query_single_ip_with_retry(self, ip):
"""带重试机制的单个IP查询"""
for retry in range(self.max_retries):
try:
api_url = self.get_current_api().format(ip)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(api_url, headers=headers, timeout=10)
if response.status_code == 200:
return self.process_successful_response(ip, response.json())
elif response.status_code == 429:
self.queue.put(("status", f"IP {ip} 被限流,等待重试..."))
if retry < self.max_retries - 1:
# 指数退避
wait_time = (2 ** retry) + random.uniform(0, 1)
time.sleep(wait_time)
continue
else:
self.queue.put(("status", "切换API端点..."))
self.switch_to_next_api()
else:
self.queue.put(("status", f"API错误,状态码: {response.status_code}"))
except requests.exceptions.Timeout:
self.queue.put(("status", f"查询 IP {ip} 超时,重试 {retry + 1}/{self.max_retries}"))
except requests.exceptions.ConnectionError:
self.queue.put(("status", f"网络连接错误,重试 {retry + 1}/{self.max_retries}"))
except Exception as e:
self.queue.put(("status", f"错误: {str(e)},重试 {retry + 1}/{self.max_retries}"))
if retry < self.max_retries - 1:
time.sleep(1 + retry) # 重试前等待
return None
def process_successful_response(self, ip, data):
"""处理成功的API响应"""
# 不同API的响应格式可能不同,需要适配
if 'ipapi.co' in self.get_current_api():
return self.format_ipapi_result(ip, data)
elif 'ip-api.com' in self.get_current_api():
return self.format_ip-api_result(ip, data)
else:
return self.format_generic_result(ip, data)
def format_ipapi_result(self, ip, data):
"""格式化ipapi.co的结果"""
result = f"{'='*50}\n"
result += f"IP地址: {ip}\n"
result += f"国家: {data.get('country_name', 'N/A')} ({data.get('country_code', 'N/A')})\n"
result += f"地区: {data.get('region', 'N/A')}\n"
result += f"城市: {data.get('city', 'N/A')}\n"
result += f"ISP: {data.get('org', 'N/A')}\n"
result += f"时区: {data.get('timezone', 'N/A')}\n"
result += f"经纬度: {data.get('latitude', 'N/A')}, {data.get('longitude', 'N/A')}\n"
result += "\n"
return result
def format_ip-api_result(self, ip, data):
"""格式化ip-api.com的结果"""
if data.get('status') == 'success':
result = f"{'='*50}\n"
result += f"IP地址: {ip}\n"
result += f"国家: {data.get('country', 'N/A')}\n"
result += f"地区: {data.get('regionName', 'N/A')}\n"
result += f"城市: {data.get('city', 'N/A')}\n"
result += f"ISP: {data.get('isp', 'N/A')}\n"
result += f"时区: {data.get('timezone', 'N/A')}\n"
result += f"经纬度: {data.get('lat', 'N/A')}, {data.get('lon', 'N/A')}\n"
result += "\n"
return result
else:
return f"查询 IP {ip} 失败: {data.get('message', '未知错误')}\n"
def format_generic_result(self, ip, data):
"""通用结果格式化"""
result = f"{'='*50}\n"
result += f"IP地址: {ip}\n"
# 尝试提取常见字段
mapping = {
'country': ['country', 'country_name', 'countryName'],
'region': ['region', 'regionName', 'state'],
'city': ['city'],
'isp': ['isp', 'org', 'asn'],
'timezone': ['timezone'],
}
for field, keys in mapping.items():
value = 'N/A'
for key in keys:
if key in data:
value = data[key]
break
result += f"{field}: {value}\n"
result += "\n"
return result
def is_valid_ip(self, ip):
"""简单的IP地址格式验证"""
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
if not 0 <= int(part) <= 255:
return False
return True
def process_queue(self):
"""处理队列中的消息"""
try:
while True:
msg_type, content = self.queue.get_nowait()
if msg_type == "result":
self.result_text.insert(tk.END, content)
self.result_text.see(tk.END)
elif msg_type == "status":
self.status_var.set(content)
elif msg_type == "error":
messagebox.showerror("错误", content)
self.status_var.set("错误")
elif msg_type == "complete":
self.query_button.config(state=tk.NORMAL)
self.status_var.set("查询完成")
self.root.update_idletasks()
except:
pass
self.root.after(100, self.process_queue)
def clear_results(self):
"""清空结果文本框"""
self.result_text.delete(1.0, tk.END)
def main():
root = tk.Tk()
app = IPQueryTool(root)
root.mainloop()
if __name__ == "__main__":
main()到此这篇关于Python+Tkinter编写一个批量IP地址归属地查询的文章就介绍到这了,更多相关Python查询IP地址归属地内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
