腾讯云 - cos私有

2024-09-07
# -*- coding=utf-8

import os
import requests
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
from qcloud_cos import CosServiceError
from qcloud_cos import CosClientError
import subprocess
import configparser
import time
from datetime import datetime

config = configparser.ConfigParser()
config.read('/root/.cos.conf')

# 腾讯云COS配置信息
secret_id = config.get('common', 'secret_id')
secret_key = config.get('common', 'secret_key')
region = config.get('common', 'region')
bucket = 'infra-devops-prod-1312767721'

config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key)  # 获取配置对象
client = CosS3Client(config)


## 获取所有.iso结尾的文件
#def get_iso_files():
#    #cmd = "cd /cos/infra-devops-prod-1312767721 && find iso pai-iso pi-miner salt-source -type f -name *.iso "
#    #cmd = "cd /cos/infra-devops-prod-1312767721 && find temp/iso -type f -name *.iso "
#    cmd = "cd /cos/infra-devops-prod-1312767721 && find iso/dahe -type f -name *.iso "
#    result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
#    if result.returncode == 0:
#        iso_files = result.stdout.splitlines()
#        iso_list = []
#        for file in iso_files:
#            iso_list.append(file)
#        return iso_list
#    else:
#        print("执行命令时出错:", result.stderr)

# 获取所有.iso结尾的文件
def get_iso_files():
    iso_list = []
    marker = ''
    while True:
        response = client.list_objects(
            Bucket=bucket,
            MaxKeys=1000,
            Marker=marker
        )
        iso_list += [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.iso')]
        if 'IsTruncated' in response and response['IsTruncated'] == 'true':
            marker = response['NextMarker']
        else:
            break
    return iso_list

# 变更文件权限为私有读写
def change_permission(file_key):
    response = client.get_object_acl(
        Bucket=bucket,
        Key=file_key
    )
    if response['CannedACL'] != 'private':
        print(response)
        client.put_object_acl(Bucket=bucket, Key=file_key, ACL='private')
        print("{} todo权限变更为私有读写".format(file_key))
    else:
        print("{} 已经是私有读写".format(file_key))

# 追加Cache-Control:no-cache
def append_cache_control(file_key):
    response = client.head_object(
        Bucket=bucket,
        Key=file_key
    )
    if response.get('Cache-Control') is None:
        print(response)
        client.copy(Bucket=bucket, Key=file_key, CopyStatus='Replaced', CacheControl='no-cache', CopySource={'Bucket': bucket, 'Key': file_key, 'Region': region })
        print("{} todo追加no-cache".format(file_key))
    else:
        print("{} 已经包含no-cache".format(file_key))

if __name__ == '__main__':
    # 获取当前时间
    current_time = datetime.now()

    # 格式化输出当前时间
    formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
    print("当前时间:", formatted_time)

    iso_files = get_iso_files()

    for file_key in iso_files:
        append_cache_control(file_key)
        change_permission(file_key)