Python - 飞书webhook

2024-06-28
from flask import Flask, request, make_response
import requests
from retry import retry
import sys

app = Flask(__name__)

feishu_webhook_prefix = "https://open.feishu.cn/open-apis/bot/v2/hook/"
# 定义一个字典,存储 namespace 和对应的飞书 token
namespace_tokens = {
    "ai-cloud": "ce8b95d3-2232-4779-1234-c00105f22cc9",
    "other": "token2",
    # 添加更多的 namespace 和对应的 token
}

# 定义一个发送飞书消息的函数,并添加重试逻辑
@retry(tries=3, delay=1)
def send_feishu_message(message, token):
    payload = {
        "msg_type": "text",
        "content": {
            "text": message
        }
    }
    headers = {
        "Authorization": f"Bearer {token}"
    }
    feishu_webhook_url=feishu_webhook_prefix + token
    response = requests.post(feishu_webhook_url, json=payload, headers=headers)

    # 检查飞书消息发送是否成功
    if response.status_code!= 200:
        raise Exception(response.status_code)

@app.route('/gitlab-webhook', methods=['POST'])
def gitlab_webhook():
    # 解析 GitLab Webhook 数据
    response = request.json
    project_data = response['project']
    commits_data = response['commits']
    event_name = response['event_name']

    namespace = project_data['namespace']
    branch_name = response['ref']
    project_name = project_data['name']
    project_url = project_data['homepage']

    # 根据 namespace 获取对应的飞书 token
    token = namespace_tokens.get(namespace)

    if token is None:
        response = make_response("无匹配的飞书Token,请联系管理员。", 401)
        return response

    print(token)
    # 构建飞书消息内容
    message = f"{project_name} is updated\nProjectUrl: {project_url}\nTrigger: {event_name}\nBranch: {branch_name} \nCommit Info:\n"

    # 遍历 commits_data 列表,收集所有的 commit 信息
    for commit in commits_data:
        commit_message = "Commit:" + commit['title'] + "\tsha1: " + commit['id'][:8] + "\tAuthor: " + commit['author']['email']
        message += f"- {commit_message}\n"

    print(message)
    # 发送飞书消息
    try:
        send_feishu_message(message, token)
        return "飞书消息发送成功"
    except Exception as e:
        # 创建一个包含错误信息的响应
        response = make_response("飞书消息发送失败: " + str(e), str(e))
        return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

image.png