Python - consul批量注销 & ssl check

2024-08-27

获取consul中service并尝试ssl check,如异常,则deregistry

import requests
import sys
import consul

def validate_https(service_address, port):
    try:
        response = requests.get(
            f"https://{service_address}:{port}",
            timeout=3,
        )
#        print(response.text)
        return True
    except requests.exceptions.SSLError as e:
        print(f"不支持https,原因:SSLError:{service_address} ")
        return False
    except requests.exceptions.InvalidURL as e:
        print(f"不支持https,原因:InvalidURL:{service_address} ")
        return False
    except requests.exceptions.ConnectTimeout as e:
        print(f"不支持https,原因:ConnectTimeout:{service_address} ")
        return False
    except requests.exceptions.ReadTimeout as e:
        print(f"不支持https,原因:ReadTimeout:{service_address} ")
        return False
    except requests.exceptions.ConnectionError as e:
        print(f"不支持https,原因:ConnectionError:{service_address} ")
        return False
    except requests.exceptions.RequestException as e:
        print(f"不支持https:{service_address}  {e}")
        return False


def get_consul_service_info(consul_host, consul_port, consul_service):
    url = f"http://{consul_host}:{consul_port}/v1/catalog/service/{consul_service}"
    response = requests.get(url)
    for record in response.json():
        service_address = record["ServiceAddress"]
        port = record["ServicePort"]
        service_id = record["ServiceID"]
        res = validate_https(service_address, port)
        if not res:
            deregister_service(consul_host, consul_port, service_id)
    return response.json()


def deregister_service(consul_host, consul_port, service_id):
    c = consul.Consul(host=consul_host, port=consul_port)
    c.agent.service.deregister(service_id=service_id)
    print(f"{consul_host}:{consul_port} deregister: {service_id}")


consul_host = "consul.paigod.work"
consul_port = "8500"
if len(sys.argv) < 2:
    print("请输入 Consul 服务名称作为参数,例如 Domain-Ipv4")
    sys.exit(1)
consul_service = sys.argv[1]

get_consul_service_info(consul_host, consul_port, consul_service)

标题:Python - consul批量注销 & ssl check
地址:https://blog.njqhome.com:8443/articles/2024/08/27/1724751757746.html