python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python监测APP流量

使用Python实现监测APP的使用流量

作者:lsp84ch80

这篇文章主要为大家详细介绍了如何使用Python实现监测APP的使用流量,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

本文介绍了一种用于监测单个APK包上下行及总流量的方法,同时实现了定时备份与邮件提醒功能。通过Python脚本,可以获取设备UID,计算设备流量,并将数据保存至Excel表格中。定时备份功能确保数据安全,而邮件提醒则方便远程监控。 

实现代码如下:

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
'''
# @Time    : 2018/1/17 21:54
# @Author  : Soner
# @version : V1.0
# @license : Copyright(C), Your Company
# @实现的功能:
            1.单APK包的上下行及总流量监测
            2.定时备份
            3.定时发送邮件提醒
'''
import time
import subprocess
import xlwt
import os
import shutil
import datetime
# ----------------- 获取设备UID -----------------
def getUid(package_name):#获取UID
    cmd = 'adb shell dumpsys package ' + package_name + ' | findstr userId'
    p1 = subprocess.Popen(cmd, shell = True,
    stdout=subprocess.PIPE, stderr = subprocess.PIPE)#用adb获取信息
    uidLongString = str(p1.stdout.read().strip(), encoding="utf-8")
    uidLongList = uidLongString.split("=")
    uid = uidLongList[1]
    return uid[0:5]
# ----------------- 计算设备流量 -----------------
def getFlowFromUid(packagename, uid=None):
    '''
    # 通过应用uid,获取应用当前消耗的流量
    # 获取PID adb shell ps | findstr 包名   如果是LINUx 系统 findstr 换成 grep
    # 获取网络标识及流量 “adb shell cat /proc/应用PID码/net/dev”
    # return (rcv,snd)
    '''
    if uid is None:
        uid = getUid(packagename)
    cmd = 'adb shell cat /proc/net/xt_qtaguid/stats | findstr %s' % uid
    std = os.popen(cmd)
    net_rcv_bck = []
    net_rcv_front = []
    net_snd_bck = []
    net_snd_front = []
    lo_rcv_bck = []
    lo_rcv_front = []
    lo_snd_bck = []
    lo_snd_front = []
    for line in std:        # if 判断里的 “ccmni0”根据使用的SIM卡变换,具体的需要先去查询改掉这里,不然会出错“adb shell cat /proc/应用PID码/net/dev”
        data = line.split()
        if 'ccmni0' in line:
            background_flow = int(data[4]) == 0
            if background_flow:
                net_rcv_bck.append(int(data[5]))
                net_snd_bck.append(int(data[7]))
            else:
                net_rcv_front.append(int(data[5]))
                net_snd_front.append(int(data[7]))
        elif 'lo' in line:
            background_flow = int(data[4]) == 0
            if background_flow:
                lo_rcv_bck.append(int(data[5]))
                lo_snd_bck.append(int(data[7]))
            else:
                lo_rcv_front.append(int(data[5]))
                lo_snd_front.append(int(data[7]))
    return sum(net_rcv_bck), sum(net_rcv_front), sum(net_snd_bck), sum(net_snd_front), \
            sum(lo_rcv_bck), sum(lo_rcv_front), sum(lo_snd_bck), sum(lo_snd_front)
# ----------------- 发送邮件 -----------------
''''' 
函数说明:Send_email_text() 函数实现发送带有附件的邮件,可以群发,附件格式包括:xlsx,pdf,txt,jpg,mp3等 
参数说明: 
    1. subject:邮件主题 
    2. content:邮件正文 
    3. filepath:附件的地址, 输入格式为["","",...] 
    4. receive_email:收件人地址, 输入格式为["","",...] 
'''  
def Send_email_text(subject,content,filepath,receive_email):  
    import smtplib  
    from email.mime.multipart import MIMEMultipart   
    from email.mime.text import MIMEText   
    from email.mime.application import MIMEApplication  
    sender = "Youn Email"  # 发送人的账号, 已配置为 163 邮箱服务,如果使用其它的邮箱,可在方法下面修改
    passwd = "PASSWORD"     # 发送人的密码  
    receivers = receive_email   #收件人邮箱  
    msgRoot = MIMEMultipart()   
    msgRoot['Subject'] = subject  
    msgRoot['From'] = sender  
    if len(receivers)>1:  
        msgRoot['To'] = ','.join(receivers) #群发邮件  
    else:  
        msgRoot['To'] = receivers[0]  
    part = MIMEText(content)   
    msgRoot.attach(part)  
    ##添加附件部分--可以自己追加类型,或者全部打开自动判断  
    for path in filepath:
        # if ".jpg" in path:
        #     #jpg类型附件
        #     jpg_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=jpg_name)
        #     msgRoot.attach(part)
        #
        # if ".pdf" in path:
        #     #pdf类型附件
        #     pdf_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=pdf_name)
        #     msgRoot.attach(part)
        if ".xls" in path:  
            #xlsx类型附件  
            xlsx_name = path.split("\\")[-1]  
            part = MIMEApplication(open(path,'rb').read())   
            part.add_header('Content-Disposition', 'attachment', filename=xlsx_name)  
            msgRoot.attach(part)  
        # if ".txt" in path:
        #     #txt类型附件
        #     txt_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=txt_name)
        #     msgRoot.attach(part)
        #
        # if ".mp3" in path:
        #     #mp3类型附件
        #     mp3_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=mp3_name)
        #     msgRoot.attach(part)
    try:
        global m
        m = smtplib.SMTP()  
        m.connect("smtp.163.com") #这里我使用的是163邮箱,也可以使用其它邮箱
        m.login(sender, passwd)  
        m.sendmail(sender, receivers, msgRoot.as_string())  
        print("邮件发送成功")  
    except smtplib.SMTPException as e:  
        print("Error, 发送失败")  
    finally:  
        m.quit()
# ----------------- 创建EXCEL表格 -----------------
col =0
row =0
book_Mirror = xlwt.Workbook(encoding='utf-8', style_compression=0)  # 创建新的工作簿Mirror
sheet_load_Mirror = book_Mirror.add_sheet('流量', cell_overwrite_ok=True)  # 创建新的sheet,并命名为流量
sheet_load_Mirror.write(row, col, "时间")
sheet_load_Mirror.write(row, col + 1, "网络下行(KB)")
sheet_load_Mirror.write(row, col + 2, "网络上行(KB)")
sheet_load_Mirror.write(row, col + 3, "网络总流量(KB)")
sheet_load_Mirror.write(row, col + 4, "本地下行(KB)")
sheet_load_Mirror.write(row, col + 5, "本地上行(KB)")
sheet_load_Mirror.write(row, col + 6, "本地总流量(KB)")
# ----------------- 需要监测的包名 -----------------
time_end =0 # 监测初始时间,单位秒
package_name_Mirror= "You 的APK名称"  # 改成自己需要测试的APk包名
uid = (getUid(package_name_Mirror))[0:5]
try:
    uid_sdk = getUid(package_name_Mirror)
    print(time.strftime('%Y-%m-%d   %H:%M:%S',time.localtime(time.time())) +'  uid =  '+str(uid_sdk))
except:
    print('获取Mirror-uid失败')
# ----------------- 定位文件目录 -----------------
file_dir = "D:\\"   # 可改成自己的目录,要与下边对称
os.chdir(file_dir)
row =1
col =0
s = 1
net_bck_start_rx, net_front_start_rx, net_bck_start_tx, net_front_start_tx, \
lo_bck_start_rx, lo_front_start_rx, lo_bck_start_tx, lo_front_start_tx = getFlowFromUid(package_name_Mirror, uid)
net_start_rx = net_bck_start_rx + net_front_start_rx
net_start_tx = net_bck_start_tx + net_front_start_tx
lo_start_rx = lo_bck_start_rx + lo_front_start_rx
lo_start_tx = lo_bck_start_tx + lo_front_start_tx
i = 1
time_k,time_s = 0,0
while   time_end <= 259200:
    net_bck_end_rx, net_front_end_rx, net_bck_end_tx, net_front_end_tx, \
    lo_bck_end_rx, lo_front_end_rx, lo_bck_end_tx, lo_front_end_tx = getFlowFromUid(package_name_Mirror, uid)
    net_end_rx = net_bck_end_rx + net_front_end_rx
    net_end_tx = net_bck_end_tx + net_front_end_tx
    lo_end_rx = lo_bck_end_rx + lo_front_end_rx
    lo_end_tx = lo_bck_end_tx + lo_front_end_tx
    net_flow_rx, net_flow_tx = net_end_rx - net_start_rx, net_end_tx - net_start_tx
    lo_flow_rx, lo_flow_tx = lo_end_rx - lo_start_rx, lo_end_tx - lo_start_tx
    net_rx_kb, net_tx_kb = round(net_flow_rx / 1024, 3), round(net_flow_tx / 1024, 3)
    lo_rx_kb, lo_tx_kb = round(lo_flow_rx / 1024, 3), round(lo_flow_tx / 1024, 3)
    timeNow = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))  # 获取当前时间用于输出
    timenew = time.strftime('%Y-%m-%d %H-%M', time.localtime(time.time()))  # 获取当前时间用户备份时的命名
    sheet_load_Mirror.write(row, col, timeNow)  # 写入时间
    sheet_load_Mirror.write(row, col + 1, net_rx_kb)  # 写入网络下行(KB)
    sheet_load_Mirror.write(row, col + 2, net_tx_kb)  # 写入网络上行(KB)
    sheet_load_Mirror.write(row, col + 3, round(net_rx_kb + net_tx_kb, 3))  # 写入网络总流量(KB)
    sheet_load_Mirror.write(row, col + 4, lo_rx_kb)  # 写入本地上行(KB)
    sheet_load_Mirror.write(row, col + 5, lo_tx_kb)  # 写入本地下行(KB)
    sheet_load_Mirror.write(row, col + 6, round(lo_rx_kb + lo_tx_kb, 3))  # 写入本地总流量(KB)
    book_Mirror.save("d:\Mirror_Folw.xls")  # 保存EXCEL表
    print(" %s ---------- %s %s ----------" % (row, package_name_Mirror, timeNow))
    print(
          '网络下行:', net_rx_kb, 'KB\t',
          '网络上行:', net_tx_kb, 'KB\t',
          '网络总流量', round(net_rx_kb + net_tx_kb, 3), 'KB\t\t',
          '本地下行:', lo_rx_kb, 'KB\t',
          '本地上行:', lo_tx_kb, 'KB\t',
          '本地总流量', round(lo_rx_kb + lo_tx_kb, 3), 'KB\t\n'
          )
    row = row + 1   # EXCEL表格追加下一行写入
    time.sleep(10)  # 控制监测频率
    time_end += 10  # 监测计时器
    time_s += 10    # 备份计时器
    time_k += 10    # 邮件计时器
    # 定时备份
    if time_s == 300:
        shutil.copy("Mirror_Folw.xls", "d:\\test\\Mirror_Folw_%s.xls" % timenew)    # 改成自己需要复制的文件(此处的路径为上面定位目录),和复制后的文件路径以及名称
        time_s = 0
        print('备份成功~!')
    try:
        # 定时发送邮件
        if time_end == (1800 * i):
            subject = "邮件标题"
            content = "邮件正文"
            Mirror_path = "d:\\test\\Mirror_Folw_%s.xls" % timenew # 附件地址
            file_path = [Mirror_path]  # 可添加多个附件到邮箱
            receive_email = ["接收人的邮箱"]
            Send_email_text(subject,content,file_path,receive_email)
            i += 1
    except:
        continue
print("---------- END  统计时长:%s----------" % str(time_k))

知识扩展:

下面我们就来看看如何使用 Python 获取 Android App 的网络数据,核心思路主要有两种:

1) 直接模拟 App 请求,这通常是绕过 App 复杂逻辑进行高效数据采集的最终目的;

2) 进行实时的网络流量捕获与分析。最终采用哪种路径,取决于你的技术栈和目标 App 的防护强度。

核心方法一:App 请求模拟(直接数据采集)

这是指通过分析 App 的网络请求,找到规律后,直接用 Python 脚本(如 requests 库)构造请求来模拟 App 的行为,直接获取数据。它的目标是实现高效、长期稳定的数据采集。

实践流程:使用抓包工具分析 → 定位关键接口 → 分析参数规律 → 使用 requests 库等编写 Python 脚本模拟请求。

核心方法二:网络流量捕获与解析

通过设置代理或在设备上抓包,实时拦截 App 产生的网络数据包,再进行解析。

核心方法三:深度分析辅助

实战指南

以下是三种主流方案的详细操作指南,请根据你的技术背景和目标 App 的防护强度选择。

方案一:mitmproxy 实时代理(适合实时介入与自动化)

该方案的核心原理是在电脑上启动 mitmproxy 服务,并将手机的网络代理指向电脑,从而让所有流量都经过代理,实现拦截、修改或自动化处理。该方案无法绕过证书绑定(SSL Pinning)

安装与启动:通过 pip install mitmproxy 安装。之后,可使用 mitmweb(Web界面)或 mitmdump -s your_script.py(加载脚本)来启动。

配置手机:确保手机与电脑在同一 Wi-Fi,在手机 Wi-Fi 设置中开启代理,填入电脑的局域网 IP 和 mitmproxy 端口(默认 8080)。首次使用需访问 mitm.it 下载并安装证书。

编写 Python 脚本:新建 add_header.py 文件,编写如下示例脚本,用于在请求头中添加自定义字段:

# add_header.py
def request(flow):
    flow.request.headers["X-Custom-Header"] = "Value"
    print(f"Modified headers for {flow.request.url}")

运行并查看:执行 mitmdump -s add_header.py,手机上的流量便会实时流经代理。你添加的请求头会出现在每一条请求中,其效果等同于 curl -H

方案二:r0capture + Frida 应用层抓包(最强方案,可绕过 SSL Pinning)

该方案利用 Frida 将 JavaScript 代码注入到目标 App 进程,直接从其内存中 Hook 加密函数,在数据加密前捕获明文内容。它能绕过 SSL Pinning,是对付高防护 App 的“大杀器”

环境准备:电脑端安装 frida-tools (pip install frida-tools)。准备一台已 Root 的安卓手机或模拟器,下载对应架构的 frida-server并推送到手机运行。

克隆与运行git clone https://github.com/r0ysue/r0capture.git。通过 adb devices 确认设备已连接。最后,使用以下命令启动抓包(-f 后跟目标应用包名,-p 指定输出文件):

python3 r0capture.py -U -f com.example.app -v -p captured.pcap

分析数据:该命令会生成 captured.pcap 文件。你可以用 Wireshark 打开它进行深度分析,也可以配合下文提到的 Scapy 进行自动化解析。

方案三:Scapy 离线解析 PCAP 文件(适合离线深度分析)

当完成抓包并得到 .pcap 文件后,Scapy 是解析它并提取业务数据的利器。

安装 Scapypip install scapy

解析代码示例:编写以下 Python 脚本,从 .pcap 文件中提取 HTTP 请求信息。

from scapy.all import rdpcap, TCP, IP, Raw
packets = rdpcap('captured.pcap') # 加载 PCAP 文件
for pkt in packets:
    if IP in pkt and TCP in pkt and Raw in pkt:
        payload = pkt[Raw].load
        try:
            # 简单解码 HTTP 请求行
            http_text = payload.decode('utf-8', errors='ignore')
            if http_text.startswith('GET') or http_text.startswith('POST'):
                print(f"发现 HTTP 请求: {http_text.splitlines()[0]}")
        except:
            pass

到此这篇关于使用Python实现监测APP的使用流量的文章就介绍到这了,更多相关Python监测APP流量内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文