python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python接受接口回调信息

使用Python接受接口回调信息的完整流程

作者:snow_to

本文主要为大家详细介绍了搭建Python Flask服务接收WCS回调数据的完整流程,文内涵盖从安装Python到执行回调脚本的全过程,适合初学者快速上手

步骤一

电脑安装Python,下载链接:下载Python |Python.org

步骤二

安装依赖环境

cmd下,执行以下命令

pip install flask

出现以下结果,代表安装成功

步骤三

创建回调脚本 agv_recv.py

CMD 输入:notepad agv_recv.py,弹出记事本后粘贴下面代码

from flask import Flask, request
app = Flask(__name__)

# 完全匹配WCS原回调路径:/wcs/services/rest/cms/agvCallback
@app.route("/wcs/services/rest/cms/agvCallback", methods=["POST","GET"])
def agv_callback():
    print("========== 收到WCS回调数据 ==========")
    print("请求头:", dict(request.headers))
    # 解析JSON报文
    if request.is_json:
        req_data = request.get_json()
        print("回调JSON报文:", req_data)
    else:
        print("表单参数:", request.form)
    # 返回WCS需要的成功应答
    return {"code":0,"msg":"success"}

if __name__ == '__main__':
    # 端口使用23600,和WCS服务同端口
    app.run(host="0.0.0.0", port=23600, debug=False)

Ctrl+S 保存,关闭记事本。

步骤四

CMD 直接执行启动命令:

python agv_recv.py

步骤五

执行 curl 命令模拟推送消息:

curl -X POST "http://127.0.0.1:23600/wcs/services/rest/cms/agvCallback" -H "Content-Type:application/json" -d "{\"agvCode\":\"AGV001\",\"taskNo\":\"T2026060301\"}"

执行后回到运行服务的 CMD,即可打印出收到的回调 JSON 数据。

补充说明

将打印数据保存至文件中

from flask import Flask, request
import datetime
app = Flask(__name__)

@app.route("/wcs/services/rest/cms/agvCallback", methods=["POST","GET"])
def agv_callback():
    print("========== 收到WCS回调数据 ==========")
    headers = dict(request.headers)
    print("请求头:", headers)
    if request.is_json:
        req_data = request.get_json()
        print("回调JSON报文:", req_data)
        # ==========新增:落地文件==========
        with open("agv_data.log","a",encoding="utf-8") as f:
            time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            f.write(f"【{time_str}】报文:{req_data}\n")
    else:
        print("表单参数:", request.form)
    return {"code":0,"msg":"success"}

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=23600, debug=False)

按日期自动生成日志文件

from flask import Flask, request
import datetime, os
app = Flask(__name__)

# 日志存放文件夹
LOG_DIR = "agv_logs"
# 文件夹不存在自动创建
if not os.path.exists(LOG_DIR):
    os.mkdir(LOG_DIR)

@app.route("/wcs/services/rest/cms/agvCallback", methods=["POST","GET"])
def agv_callback():
    now = datetime.datetime.now()
    time_full = now.strftime("%Y-%m-%d %H:%M:%S")
    date_str = now.strftime("%Y%m%d") # 日期作为日志文件名
    
    print(f"==========【{time_full}】收到WCS回调数据 ==========")
    headers = dict(request.headers)
    print("请求头:", headers)

    # 拼接当日日志完整路径
    log_file = os.path.join(LOG_DIR, f"agv_{date_str}.log")

    if request.is_json:
        req_data = request.get_json()
        print("回调JSON报文:", req_data)
        # 写入当天的日志文件
        with open(log_file,"a",encoding="utf-8") as f:
            f.write(f"【{time_full}】请求头:{headers}\n【报文内容】:{req_data}\n-------------------------\n")
    else:
        form_data = request.form
        print("表单参数:", form_data)
        with open(log_file,"a",encoding="utf-8") as f:
            f.write(f"【{time_full}】请求头:{headers}\n【表单内容】:{form_data}\n-------------------------\n")

    return {"code":0,"msg":"success"}

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=23600, debug=False)

知识扩展

在开发中,“接口回调”通常指 Webhook:一方(回调提供方)在某个事件发生时,主动向一个预先配置的 URL(你的服务器端点)发送 HTTP 请求,携带事件数据。你需要做的就是搭建一个 HTTP 服务器,监听这些请求并处理。

下面以 Flask 为例,演示如何安全、稳定地接收和处理回调。

1. 最简单的 Flask 回调接收端

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def webhook():
    # 获取请求体数据(假设为 JSON)
    data = request.get_json()
    if not data:
        return jsonify({'error': 'Invalid data'}), 400

    # 打印或处理数据
    print(f"Received webhook: {data}")

    # 可以在这里触发异步任务,避免阻塞响应
    # process_webhook_async(data)

    # 必须返回 2xx 状态码,否则回调方可能重试
    return jsonify({'status': 'ok'}), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

2. 处理不同类型的数据

回调可以是 application/jsonapplication/x-www-form-urlencoded 或 multipart/form-data

# 兼容多种 Content-Type
if request.is_json:
    data = request.get_json()
else:
    data = request.form.to_dict()   # 或 request.data 原始数据

3. 验证签名(安全性)—— 防止伪造回调

很多服务(如 Stripe、GitHub、支付宝)会在 HTTP 头中附带签名,你需要验证该签名以确保请求确实来自对方。

示例:Stripe 风格签名验证(HMAC-SHA256)

import hmac
import hashlib

SECRET = "your_webhook_secret"   # 从回调提供方获取

def verify_signature(payload, signature_header):
    # 假设签名头格式:t=timestamp,v1=signature
    # 实际根据文档提取 timestamp 和 signature
    expected = hmac.new(
        SECRET.encode('utf-8'),
        msg=payload,
        digestmod=hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature_header)

在 Flask 中:

@app.route('/webhook', methods=['POST'])
def webhook():
    payload = request.get_data()   # 原始字节数据
    sig_header = request.headers.get('X-Signature')
    if not verify_signature(payload, sig_header):
        return '', 401
    # 验证通过后再处理

4. 异步处理任务 —— 防止超时

回调通常要求快速响应(比如 10 秒内),如果处理逻辑耗时(写数据库、发邮件等),应该立即返回 200,然后放入后台队列处理。

使用 Celery 或 Redis Queue,最简单的做法是启动一个线程:

from threading import Thread

def process_webhook(data):
    # 耗时操作
    pass

@app.route('/webhook', methods=['POST'])
def webhook():
    data = request.get_json()
    Thread(target=process_webhook, args=(data,)).start()
    return '', 200

注意:生产环境建议使用 Celery + Redis/RabbitMQ 来管理任务。

5. 断连与重试机制

回调方通常有重试策略(如指数退避)。你的服务应保证幂等性:即使收到两次相同的事件,处理结果也应一致(例如通过 event_id 去重)。

processed_ids = set()   # 实际应存到 Redis 或数据库
def handle_event(event_id, data):
    if event_id in processed_ids:
        return
    processed_ids.add(event_id)
    # 实际处理

6. 使用 FastAPI 编写异步回调接收

FastAPI 天然支持异步,性能更好:

from fastapi import FastAPI, Request, BackgroundTasks

app = FastAPI()

@app.post('/webhook')
async def webhook(request: Request, background_tasks: BackgroundTasks):
    data = await request.json()
    background_tasks.add_task(process_webhook, data)
    return {'status': 'ok'}

到此这篇关于使用Python接受接口回调信息的完整流程的文章就介绍到这了,更多相关Python接受接口回调信息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文