Python - consul批量注销 & ssl check
2024-08-27
获取consul中service并尝试ssl check,如异常,则deregistry
import requests
import sys
import consul
def validate_https(service_address, port):
try:
response = requests.get(
f"https://{service_address}:{port}",
timeout=3,
)
# print(response.text)
return True
except requests.exceptions.SSLError as e:
print(f"不支持https,原因:SSLError:{service_address} ")
return False
except requests.exceptions.InvalidURL as e:
print(f"不支持https,原因:InvalidURL:{service_address} ")
return False
except requests.exceptions.ConnectTimeout as e:
print(f"不支持https,原因:ConnectTimeout:{service_address} ")
return False
except requests.exceptions.ReadTimeout as e:
print(f"不支持https,原因:ReadTimeout:{service_address} ")
return False
except requests.exceptions.ConnectionError as e:
print(f"不支持https,原因:ConnectionError:{service_address} ")
return False
except requests.exceptions.RequestException as e:
print(f"不支持https:{service_address} {e}")
return False
def get_consul_service_info(consul_host, consul_port, consul_service):
url = f"http://{consul_host}:{consul_port}/v1/catalog/service/{consul_service}"
response = requests.get(url)
for record in response.json():
service_address = record["ServiceAddress"]
port = record["ServicePort"]
service_id = record["ServiceID"]
res = validate_https(service_address, port)
if not res:
deregister_service(consul_host, consul_port, service_id)
return response.json()
def deregister_service(consul_host, consul_port, service_id):
c = consul.Consul(host=consul_host, port=consul_port)
c.agent.service.deregister(service_id=service_id)
print(f"{consul_host}:{consul_port} deregister: {service_id}")
consul_host = "consul.paigod.work"
consul_port = "8500"
if len(sys.argv) < 2:
print("请输入 Consul 服务名称作为参数,例如 Domain-Ipv4")
sys.exit(1)
consul_service = sys.argv[1]
get_consul_service_info(consul_host, consul_port, consul_service)
标题:Python - consul批量注销 & ssl check
地址:https://blog.njqhome.com:8443/articles/2024/08/27/1724751757746.html