腾讯云 - cos私有
2024-09-07
# -*- coding=utf-8
import os
import requests
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
from qcloud_cos import CosServiceError
from qcloud_cos import CosClientError
import subprocess
import configparser
import time
from datetime import datetime
config = configparser.ConfigParser()
config.read('/root/.cos.conf')
# 腾讯云COS配置信息
secret_id = config.get('common', 'secret_id')
secret_key = config.get('common', 'secret_key')
region = config.get('common', 'region')
bucket = 'infra-devops-prod-1312767721'
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key) # 获取配置对象
client = CosS3Client(config)
## 获取所有.iso结尾的文件
#def get_iso_files():
# #cmd = "cd /cos/infra-devops-prod-1312767721 && find iso pai-iso pi-miner salt-source -type f -name *.iso "
# #cmd = "cd /cos/infra-devops-prod-1312767721 && find temp/iso -type f -name *.iso "
# cmd = "cd /cos/infra-devops-prod-1312767721 && find iso/dahe -type f -name *.iso "
# result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# if result.returncode == 0:
# iso_files = result.stdout.splitlines()
# iso_list = []
# for file in iso_files:
# iso_list.append(file)
# return iso_list
# else:
# print("执行命令时出错:", result.stderr)
# 获取所有.iso结尾的文件
def get_iso_files():
iso_list = []
marker = ''
while True:
response = client.list_objects(
Bucket=bucket,
MaxKeys=1000,
Marker=marker
)
iso_list += [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.iso')]
if 'IsTruncated' in response and response['IsTruncated'] == 'true':
marker = response['NextMarker']
else:
break
return iso_list
# 变更文件权限为私有读写
def change_permission(file_key):
response = client.get_object_acl(
Bucket=bucket,
Key=file_key
)
if response['CannedACL'] != 'private':
print(response)
client.put_object_acl(Bucket=bucket, Key=file_key, ACL='private')
print("{} todo权限变更为私有读写".format(file_key))
else:
print("{} 已经是私有读写".format(file_key))
# 追加Cache-Control:no-cache
def append_cache_control(file_key):
response = client.head_object(
Bucket=bucket,
Key=file_key
)
if response.get('Cache-Control') is None:
print(response)
client.copy(Bucket=bucket, Key=file_key, CopyStatus='Replaced', CacheControl='no-cache', CopySource={'Bucket': bucket, 'Key': file_key, 'Region': region })
print("{} todo追加no-cache".format(file_key))
else:
print("{} 已经包含no-cache".format(file_key))
if __name__ == '__main__':
# 获取当前时间
current_time = datetime.now()
# 格式化输出当前时间
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
print("当前时间:", formatted_time)
iso_files = get_iso_files()
for file_key in iso_files:
append_cache_control(file_key)
change_permission(file_key)