python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python端口扫描器

基于Python编写一个简单的端口扫描器

作者:闲人编程

在当今数字化时代,网络安全已成为个人和企业不可忽视的重要议题,端口扫描作为网络侦察的基础技术,是网络安全领域中的一个核心概念,下面我们就来看看如何基于Python编写一个简单的端口扫描器

1. 引言

在当今数字化时代,网络安全已成为个人和企业不可忽视的重要议题。随着网络攻击手段的不断升级,了解基本的网络安全原理和工具变得尤为重要。端口扫描作为网络侦察的基础技术,是网络安全领域中的一个核心概念。

1.1 端口扫描的重要性

端口扫描器是一种用于探测目标主机开放端口的工具,它可以帮助网络管理员:

1.2 Python在网络安全中的优势

Python因其简洁的语法、丰富的库生态系统和强大的社区支持,已成为网络安全领域的首选编程语言之一。在端口扫描器开发方面,Python提供了socketthreading等标准库,使得网络编程变得简单高效。

本文将详细介绍如何使用Python编写一个功能完整的端口扫描器,涵盖从基础原理到高级优化的各个方面。

2. 端口扫描基础

2.1 网络端口概念

在网络通信中,端口是计算机与外部世界通信的虚拟端点。每个端口都有一个唯一的数字标识,范围从0到65535。根据IANA(互联网号码分配机构)的规定,端口可分为三类:

2.2 TCP/IP握手过程

TCP(传输控制协议)是一种面向连接的协议,它通过三次握手建立可靠连接:

端口扫描技术正是基于对这一过程的巧妙利用。

2.3 常见的端口扫描技术

扫描类型原理优点缺点
TCP Connect扫描完成完整的TCP三次握手准确可靠,不需要特殊权限容易被检测,速度较慢
SYN扫描(半开放扫描)只发送SYN包,不完成握手速度快,相对隐蔽需要原始套接字权限
UDP扫描发送UDP包并分析响应可检测UDP服务不可靠,速度慢
FIN扫描发送FIN包绕过防火墙可绕过某些防火墙结果可能不准确

3. 端口扫描器设计

3.1 功能需求分析

我们的端口扫描器需要具备以下核心功能:

3.2 系统架构设计

4. 数学原理与性能优化

4.1 扫描时间估算

端口扫描的总时间可以通过以下公式估算:

Ttotal=Nports×Ttimeout×(1/Nthreads)+Toverhead

其中:

4.2 最优线程数计算

根据Amdahl定律,并行程序的加速比受限于其串行部分。对于端口扫描器,最优线程数可以通过以下方式估算:

Noptimal=(Tserial​​/Tparallel)×Ncores

在实际应用中,考虑到网络延迟和系统资源限制,我们通常设置线程数为CPU核心数的2-4倍。

5. Python实现细节

5.1 核心模块介绍

我们的端口扫描器将包含以下核心模块:

5.2 关键技术实现

5.2.1 Socket编程基础

Python的socket模块提供了底层的网络通信功能。创建TCP连接的基本流程如下:

import socket

# 创建socket对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置超时时间
sock.settimeout(5)
try:
    # 尝试连接
    result = sock.connect_ex((host, port))
    if result == 0:
        print(f"端口 {port} 开放")
    else:
        print(f"端口 {port} 关闭")
except socket.error as e:
    print(f"扫描端口 {port} 时出错: {e}")
finally:
    sock.close()

5.2.2 多线程实现

为了提高扫描效率,我们使用concurrent.futures模块实现线程池:

from concurrent.futures import ThreadPoolExecutor, as_completed

def scan_ports(host, ports, max_threads=100):
    open_ports = []
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        # 提交扫描任务
        future_to_port = {
            executor.submit(scan_port, host, port, timeout): port 
            for port in ports
        }
        
        # 收集结果
        for future in as_completed(future_to_port):
            port = future_to_port[future]
            try:
                result = future.result()
                if result:
                    open_ports.append(port)
            except Exception as e:
                print(f"端口 {port} 扫描异常: {e}")
    
    return sorted(open_ports)

6. 完整代码实现

6.1 端口扫描器完整代码

#!/usr/bin/env python3
"""
简单的多线程端口扫描器
作者: 网络安全爱好者
版本: 1.0
"""

import socket
import argparse
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import sys
from typing import List, Tuple, Union

class PortScanner:
    """
    端口扫描器类
    实现多线程TCP Connect扫描功能
    """
    
    def __init__(self, timeout: float = 1.0, max_threads: int = 100):
        """
        初始化端口扫描器
        
        Args:
            timeout: 连接超时时间(秒)
            max_threads: 最大线程数
        """
        self.timeout = timeout
        self.max_threads = max_threads
        self.lock = threading.Lock()
        self.scan_results = []
        
    def resolve_host(self, host: str) -> str:
        """
        解析主机名或IP地址
        
        Args:
            host: 主机名或IP地址
            
        Returns:
            解析后的IP地址
            
        Raises:
            socket.gaierror: 当主机名无法解析时
        """
        try:
            # 尝试解析主机名
            ip = socket.gethostbyname(host)
            return ip
        except socket.gaierror as e:
            raise socket.gaierror(f"无法解析主机: {host}, 错误: {e}")
    
    def scan_port(self, host: str, port: int) -> Tuple[int, bool, str]:
        """
        扫描单个端口
        
        Args:
            host: 目标主机
            port: 目标端口
            
        Returns:
            (端口, 是否开放, 服务名称)
        """
        try:
            # 创建socket对象
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            
            # 尝试连接
            result = sock.connect_ex((host, port))
            
            if result == 0:
                # 端口开放,尝试获取服务名称
                try:
                    service = socket.getservbyport(port, 'tcp')
                except OSError:
                    service = "unknown"
                return (port, True, service)
            else:
                return (port, False, "")
                
        except socket.timeout:
            return (port, False, "timeout")
        except socket.error as e:
            return (port, False, f"error: {e}")
        finally:
            try:
                sock.close()
            except:
                pass
    
    def parse_port_range(self, port_range: str) -> List[int]:
        """
        解析端口范围字符串
        
        Args:
            port_range: 端口范围字符串,如 "80", "1-100", "22,80,443"
            
        Returns:
            端口列表
            
        Raises:
            ValueError: 当端口范围格式错误时
        """
        ports = []
        
        # 处理逗号分隔的端口列表
        if ',' in port_range:
            parts = port_range.split(',')
            for part in parts:
                if '-' in part:
                    # 处理范围
                    start, end = part.split('-')
                    try:
                        start_port = int(start.strip())
                        end_port = int(end.strip())
                        if 1 <= start_port <= end_port <= 65535:
                            ports.extend(range(start_port, end_port + 1))
                        else:
                            raise ValueError(f"端口范围 {part} 无效")
                    except ValueError as e:
                        raise ValueError(f"无效的端口范围: {part}, 错误: {e}")
                else:
                    # 处理单个端口
                    try:
                        port = int(part.strip())
                        if 1 <= port <= 65535:
                            ports.append(port)
                        else:
                            raise ValueError(f"端口 {port} 超出范围")
                    except ValueError as e:
                        raise ValueError(f"无效的端口: {part}, 错误: {e}")
        
        # 处理端口范围
        elif '-' in port_range:
            start, end = port_range.split('-')
            try:
                start_port = int(start.strip())
                end_port = int(end.strip())
                if 1 <= start_port <= end_port <= 65535:
                    ports = list(range(start_port, end_port + 1))
                else:
                    raise ValueError(f"端口范围 {port_range} 无效")
            except ValueError as e:
                raise ValueError(f"无效的端口范围: {port_range}, 错误: {e}")
        
        # 处理单个端口
        else:
            try:
                port = int(port_range)
                if 1 <= port <= 65535:
                    ports = [port]
                else:
                    raise ValueError(f"端口 {port} 超出范围")
            except ValueError as e:
                raise ValueError(f"无效的端口: {port_range}, 错误: {e}")
        
        return sorted(set(ports))  # 去重并排序
    
    def scan_ports(self, host: str, ports: Union[str, List[int]], 
                   show_closed: bool = False) -> List[Tuple[int, bool, str]]:
        """
        扫描指定主机的端口
        
        Args:
            host: 目标主机
            ports: 端口范围或列表
            show_closed: 是否显示关闭的端口
            
        Returns:
            扫描结果列表,每个元素为 (端口, 是否开放, 服务名称)
        """
        # 解析主机名
        try:
            ip = self.resolve_host(host)
            print(f"扫描主机: {host} ({ip})")
        except socket.gaierror as e:
            print(f"错误: {e}")
            return []
        
        # 解析端口
        if isinstance(ports, str):
            try:
                port_list = self.parse_port_range(ports)
            except ValueError as e:
                print(f"错误: {e}")
                return []
        else:
            port_list = ports
        
        print(f"扫描端口数量: {len(port_list)}")
        print(f"使用线程数: {self.max_threads}")
        print(f"超时时间: {self.timeout}秒")
        print("-" * 50)
        
        start_time = time.time()
        open_count = 0
        
        # 使用线程池执行扫描
        with ThreadPoolExecutor(max_workers=self.max_threads) as executor:
            # 提交所有扫描任务
            future_to_port = {
                executor.submit(self.scan_port, ip, port): port 
                for port in port_list
            }
            
            # 处理完成的任务
            for future in as_completed(future_to_port):
                port = future_to_port[future]
                try:
                    result = future.result()
                    port_num, is_open, service = result
                    
                    with self.lock:
                        if is_open:
                            print(f"端口 {port_num:5d} 开放 - 服务: {service}")
                            open_count += 1
                            self.scan_results.append(result)
                        elif show_closed:
                            print(f"端口 {port_num:5d} 关闭")
                            self.scan_results.append(result)
                            
                except Exception as e:
                    print(f"扫描端口 {port} 时发生异常: {e}")
        
        # 输出统计信息
        elapsed_time = time.time() - start_time
        print("-" * 50)
        print(f"扫描完成!")
        print(f"开放端口数量: {open_count}/{len(port_list)}")
        print(f"扫描耗时: {elapsed_time:.2f} 秒")
        
        return self.scan_results

def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="简单的多线程端口扫描器",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
使用示例:
  %(prog)s 192.168.1.1 80
  %(prog)s example.com 1-100
  %(prog)s localhost 22,80,443,8000-9000
  %(prog)s 10.0.0.1 1-1000 -t 5 -T 2.0 -v
        """
    )
    
    parser.add_argument("host", help="要扫描的目标主机名或IP地址")
    parser.add_argument("ports", help="要扫描的端口范围,如: 80, 1-100, 22,80,443")
    parser.add_argument("-t", "--threads", type=int, default=100,
                       help="最大线程数 (默认: 100)")
    parser.add_argument("-T", "--timeout", type=float, default=1.0,
                       help="连接超时时间,单位秒 (默认: 1.0)")
    parser.add_argument("-v", "--verbose", action="store_true",
                       help="显示所有端口(包括关闭的端口)")
    
    args = parser.parse_args()
    
    # 创建扫描器实例
    scanner = PortScanner(timeout=args.timeout, max_threads=args.threads)
    
    try:
        # 执行扫描
        results = scanner.scan_ports(args.host, args.ports, args.verbose)
        
        # 输出开放端口摘要
        if results:
            open_ports = [(port, service) for port, is_open, service in results if is_open]
            if open_ports:
                print("\n开放端口摘要:")
                for port, service in sorted(open_ports):
                    print(f"  {port}/tcp - {service}")
            else:
                print("\n未发现开放端口")
                
    except KeyboardInterrupt:
        print("\n扫描被用户中断")
        sys.exit(1)
    except Exception as e:
        print(f"扫描过程中发生错误: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

6.2 代码说明与使用示例

6.2.1 代码结构说明

6.2.2 使用示例

# 扫描单个端口
python port_scanner.py example.com 80

# 扫描端口范围
python port_scanner.py 192.168.1.1 1-100

# 扫描多个指定端口
python port_scanner.py localhost 22,80,443,8000

# 使用更多线程和更长超时时间
python port_scanner.py target.com 1-1000 -t 200 -T 2.0

# 显示所有端口(包括关闭的)
python port_scanner.py 10.0.0.1 20-30 -v

7. 高级功能与优化

7.1 性能优化策略

7.1.1 动态超时调整

根据网络状况动态调整超时时间可以显著提高扫描效率:

def adaptive_timeout(self, host: str, sample_ports: List[int] = [80, 443, 22]) -> float:
    """
    根据样本端口响应时间计算合适的超时时间
    
    Args:
        host: 目标主机
        sample_ports: 用于测试的样本端口
        
    Returns:
        计算得到的超时时间
    """
    response_times = []
    
    for port in sample_ports:
        try:
            start = time.time()
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(2.0)
            result = sock.connect_ex((host, port))
            elapsed = time.time() - start
            
            if result == 0:  # 端口开放
                response_times.append(elapsed)
            sock.close()
        except:
            pass
    
    if response_times:
        avg_time = sum(response_times) / len(response_times)
        return min(max(avg_time * 3, 0.5), 5.0)  # 限制在0.5-5秒之间
    else:
        return 2.0  # 默认超时时间

7.1.2 连接池优化

对于大量端口的扫描,可以复用socket连接以减少资源开销:

class ConnectionPool:
    """连接池管理类"""
    
    def __init__(self, max_size=10):
        self.max_size = max_size
        self.pool = []
        self.lock = threading.Lock()
    
    def get_connection(self, host, port, timeout):
        """从池中获取或创建连接"""
        with self.lock:
            if self.pool:
                sock = self.pool.pop()
                try:
                    # 测试连接是否仍然有效
                    sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
                    return sock
                except socket.error:
                    sock.close()
            
        # 创建新连接
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        return sock
    
    def return_connection(self, sock):
        """将连接返回池中"""
        with self.lock:
            if len(self.pool) < self.max_size:
                self.pool.append(sock)
            else:
                sock.close()

7.2 安全与隐蔽性考虑

7.2.1 随机化扫描顺序

按顺序扫描端口容易被入侵检测系统识别,随机化扫描顺序可以提高隐蔽性:

import random

def randomize_scan_order(self, ports: List[int]) -> List[int]:
    """
    随机化端口扫描顺序
    
    Args:
        ports: 原始端口列表
        
    Returns:
        随机排序后的端口列表
    """
    randomized_ports = ports.copy()
    random.shuffle(randomized_ports)
    return randomized_ports

7.2.2 速率限制

为了避免被目标主机的防火墙封锁,可以实施速率限制:

import time

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, max_requests_per_second):
        self.max_requests = max_requests_per_second
        self.interval = 1.0 / max_requests_per_second
        self.last_request_time = 0
        self.lock = threading.Lock()
    
    def acquire(self):
        """获取执行许可"""
        with self.lock:
            current_time = time.time()
            elapsed = current_time - self.last_request_time
            
            if elapsed < self.interval:
                time.sleep(self.interval - elapsed)
            
            self.last_request_time = time.time()

8. 代码测试与验证

8.1 单元测试

为了确保代码质量,我们需要编写单元测试:

import unittest
from unittest.mock import patch, MagicMock

class TestPortScanner(unittest.TestCase):
    """端口扫描器测试类"""
    
    def setUp(self):
        self.scanner = PortScanner(timeout=0.1, max_threads=10)
    
    def test_resolve_host_valid(self):
        """测试有效主机名解析"""
        ip = self.scanner.resolve_host("localhost")
        self.assertEqual(ip, "127.0.0.1")
    
    def test_resolve_host_invalid(self):
        """测试无效主机名解析"""
        with self.assertRaises(socket.gaierror):
            self.scanner.resolve_host("invalid-hostname-that-should-not-exist")
    
    def test_parse_port_range_single(self):
        """测试单个端口解析"""
        ports = self.scanner.parse_port_range("80")
        self.assertEqual(ports, [80])
    
    def test_parse_port_range_range(self):
        """测试端口范围解析"""
        ports = self.scanner.parse_port_range("1-3")
        self.assertEqual(ports, [1, 2, 3])
    
    def test_parse_port_range_mixed(self):
        """测试混合端口解析"""
        ports = self.scanner.parse_port_range("80,443,1-3")
        self.assertEqual(ports, [1, 2, 3, 80, 443])
    
    @patch('socket.socket')
    def test_scan_port_open(self, mock_socket):
        """测试开放端口扫描"""
        mock_sock = MagicMock()
        mock_socket.return_value = mock_sock
        mock_sock.connect_ex.return_value = 0
        
        port, is_open, service = self.scanner.scan_port("localhost", 80)
        
        self.assertEqual(port, 80)
        self.assertTrue(is_open)
        mock_sock.connect_ex.assert_called_with(("localhost", 80))

if __name__ == '__main__':
    unittest.main()

8.2 集成测试

在实际环境中测试扫描器的功能:

def integration_test():
    """集成测试函数"""
    scanner = PortScanner(timeout=1.0, max_threads=50)
    
    # 测试本地主机
    print("测试本地主机扫描...")
    results = scanner.scan_ports("localhost", "80,443,22,8080")
    
    # 验证结果
    open_ports = [port for port, is_open, service in results if is_open]
    print(f"发现的开放端口: {open_ports}")
    
    # 预期至少本地回环地址应该有某些端口开放
    assert len(open_ports) >= 0, "应该能够扫描本地主机"

if __name__ == "__main__":
    integration_test()

9. 法律与道德考量

9.1 合法使用原则

端口扫描器是一把双刃剑,既可以用于安全评估,也可能被恶意使用。在使用本工具时,请务必遵守以下原则:

9.2 负责任披露

如果发现安全漏洞,应遵循负责任披露原则:

10. 总结与扩展

10.1 项目总结

通过本文,我们实现了一个功能完整的Python端口扫描器,具备以下特点:

10.2 进一步扩展

这个基础扫描器还可以进一步扩展:

免责声明:本文介绍的端口扫描技术仅用于教育目的和授权的安全测试。未经授权扫描他人网络可能违反法律,请务必在合法合规的前提下使用这些技术。

代码自查说明:本文提供的代码已经过基本测试,但在生产环境中使用前仍需进行充分的安全测试和代码审查。特别注意线程安全、资源管理和异常处理等方面。

到此这篇关于基于Python编写一个简单的端口扫描器的文章就介绍到这了,更多相关Python端口扫描器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文