Python - 飞书webhook
2024-06-28
from flask import Flask, request, make_response
import requests
from retry import retry
import sys
app = Flask(__name__)
feishu_webhook_prefix = "https://open.feishu.cn/open-apis/bot/v2/hook/"
# 定义一个字典,存储 namespace 和对应的飞书 token
namespace_tokens = {
"ai-cloud": "ce8b95d3-2232-4779-1234-c00105f22cc9",
"other": "token2",
# 添加更多的 namespace 和对应的 token
}
# 定义一个发送飞书消息的函数,并添加重试逻辑
@retry(tries=3, delay=1)
def send_feishu_message(message, token):
payload = {
"msg_type": "text",
"content": {
"text": message
}
}
headers = {
"Authorization": f"Bearer {token}"
}
feishu_webhook_url=feishu_webhook_prefix + token
response = requests.post(feishu_webhook_url, json=payload, headers=headers)
# 检查飞书消息发送是否成功
if response.status_code!= 200:
raise Exception(response.status_code)
@app.route('/gitlab-webhook', methods=['POST'])
def gitlab_webhook():
# 解析 GitLab Webhook 数据
response = request.json
project_data = response['project']
commits_data = response['commits']
event_name = response['event_name']
namespace = project_data['namespace']
branch_name = response['ref']
project_name = project_data['name']
project_url = project_data['homepage']
# 根据 namespace 获取对应的飞书 token
token = namespace_tokens.get(namespace)
if token is None:
response = make_response("无匹配的飞书Token,请联系管理员。", 401)
return response
print(token)
# 构建飞书消息内容
message = f"{project_name} is updated\nProjectUrl: {project_url}\nTrigger: {event_name}\nBranch: {branch_name} \nCommit Info:\n"
# 遍历 commits_data 列表,收集所有的 commit 信息
for commit in commits_data:
commit_message = "Commit:" + commit['title'] + "\tsha1: " + commit['id'][:8] + "\tAuthor: " + commit['author']['email']
message += f"- {commit_message}\n"
print(message)
# 发送飞书消息
try:
send_feishu_message(message, token)
return "飞书消息发送成功"
except Exception as e:
# 创建一个包含错误信息的响应
response = make_response("飞书消息发送失败: " + str(e), str(e))
return response
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)