python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python socket多进程端口扫描

Python利用socket实现多进程的端口扫描器

作者:Sir 老王

作为开发人员经常需要查看服务的端口开启状态判断服务是否宕机。特别是部署的服务比较多的情况下,可能存在几个甚至几十个服务端口的占用。所以本文将利用socket实现多进程的端口扫描器,需要的可以参考一下

作为开发人员经常需要查看服务的端口开启状态判断服务是否宕机。

特别是部署的服务比较多的情况下,可能存在几个甚至几十个服务端口的占用,于是我利用socket不断向服务发送请求的方式来判断端口服务是否已经完成开启。

其中加入多进程的调用方式来提高端口扫描的效率,供大家参考!

首先,我们将需要的python模块全部导入到我们的代码块中,若是没有安装的模块使用pip的当时安装一下即可。

# Importing the socket module.
import socket

# Importing the datetime module from the datetime package.
from datetime import datetime

# It's a shortcut for `from multiprocessing import Pool`
from multiprocessing.dummy import Pool

# It's a shortcut for `from loguru import logger`
from loguru import logger

然后,创建一个端口扫描类PortScanner来完成对整个业务逻辑的处理,另外,封装到类中也便于大家参考和修改。

class PortScanner:
    def __init__(self):
        """
        A constructor. It is called when an object is created from a class and it allows the class to initialize the
        attributes of a class.
        """
        super(PortScanner, self).__init__()
        self.remote_ip = None
        self.ports = []

    def scanner(self, port):
        """
        It scans the port.

        :param port: The port you want to scan
        """
        try:
            socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            result_ = socket_.connect_ex((self.remote_ip, port))
            if result_ == 0:
                logger.info('地址:{} 端口:{} 已成功开启!'.format(self.remote_ip, port))
            else:
                logger.info('地址:{} 端口:{} 未开启!'.format(self.remote_ip, port))
        except Exception as e:
            logger.error('端口扫描出现异常!')
        finally:
            socket_.close()

    def start(self):
        """
        It starts the game.
        """
        remote_server = input("输入要扫描的主机地址(127.0.0.1):")
        if remote_server.strip() == '':
            remote_server = '127.0.0.1'
        self.remote_ip = socket.gethostbyname(remote_server)
        port_range = input("输入要扫描的端口范围(1,50000):")
        scanner_ports = []
        if port_range.strip() == '':
            port_range = '1,50000'
        scanner_ports = [n for n in range(int(port_range.split(',')[0]), int(port_range.split(',')[1]))]
        socket.setdefaulttimeout(0.5)
        start_ = datetime.now()
        pool = Pool(processes=10)
        pool.map(self.scanner, scanner_ports)
        pool.close()
        pool.join()
        end_ = datetime.now()
        logger.info('所有端口扫描已完成,总共耗时:{}'.format(str(end_ - start_)))

使用python模块中的main函数调用整个端口扫描器执行扫描任务。

# It's a common idiom to determine if the script is being run directly or being imported.
if __name__ == '__main__':
    scanner_ = PortScanner()
    scanner_.start()

到此这篇关于Python利用socket实现多进程的端口扫描器的文章就介绍到这了,更多相关Python socket多进程端口扫描内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文