You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hetzner-ansible/inventory_plugins/netgo-hcloud.py

267 lines
11 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r"""
name: netgo-hcloud
plugin_type: inventory
author:
- netgo
short_description: Ansible dynamic inventory plugin for Hetzner hcloud servers.
requirements:
- python >= 2.10
description:
- Reads inventory data from the Hetzner Cloud API. Server Groups are given by the <service> label. Servers need two labels: service and stage.
extends_documentation_fragment:
- constructed
- inventory_cache
options:
plugin:
description: Marks this as an instance of the "netgo-hcloud" plugin
required: true
choices: ["netgo-hcloud"]
api_token:
description: The Hetzner Cloud API token.
required: true
env:
- name: HETZNER_CLOUD_TOKEN
stage:
description: Filter servers by this stage.
required: true
env:
- name: HETZNER_STAGE
stage_kube:
description: Filter servers by this stage.
required: true
env:
- name: HETZNER_STAGE_KUBE
label_selector:
description: Filter servers by this label selector.
required: true
env:
- name: HETZNER_LABEL_SELECTOR
"""
import json
import os
import re
from ansible.errors import AnsibleError
from ansible.module_utils.urls import open_url
from ansible.module_utils._text import to_native
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
from ansible.release import __version__
from ansible.utils.display import Display
class MyHcloudAPI:
BASE = "https://api.hetzner.cloud"
def __init__(self, token, label_selector=None):
self.token = token
self.label_selector = label_selector
def get_values(self, api_path, response_values_field_name):
display = Display()
try:
response_values = []
# pagination with page_size per window, repeat until last page is reached
page = 1
page_size = 20
while page > 0:
api_url = ""
if not self.label_selector:
api_url = "{}/{}?per_page={}&page={}".format(self.BASE, api_path, str(page_size), str(page))
else:
api_url = "{}/{}?label_selector={}&per_page={}&page={}".format(self.BASE, api_path, self.label_selector, str(page_size), str(page))
display.display(api_url)
response = open_url(
api_url,
headers={"Authorization": "Bearer " + self.token},
)
json_response = json.loads(response.read())
response_values += json_response[response_values_field_name]
if json_response["meta"]["pagination"]["page"] == json_response["meta"]["pagination"]["last_page"]:
break
page += 1
return response_values
except ValueError:
raise AnsibleError("Incorrect JSON payload")
except Exception as e:
raise AnsibleError("Error while fetching %s: %s" % (api_url, to_native(e)))
def get_servers(self):
return self.get_values("v1/servers", "servers")
def get_networks(self):
return self.get_values("v1/networks", "networks")
def get_loadbalancers(self):
return self.get_values("v1/load_balancers", "load_balancers")
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
NAME = "netgo-hcloud"
def verify_file(self, path):
return super(InventoryModule, self).verify_file(path) and path.endswith(
(self.NAME + ".yaml", self.NAME + ".yml")
)
def _read_servers_from_API(self):
servers = MyHcloudAPI(
self.get_option("api_token"), self.get_option("label_selector")
).get_servers()
return servers
def _read_networks_from_API(self):
networks = MyHcloudAPI(
self.get_option("api_token"), self.get_option("label_selector")
).get_networks()
return networks
def _read_loadbalancers_from_API(self):
loadbalancers = MyHcloudAPI(
self.get_option("api_token")
).get_loadbalancers()
return loadbalancers
def parse(self, inventory, loader, path, cache=True):
super(InventoryModule, self).parse(inventory, loader, path, cache)
config = self._read_config_data(path)
cache_key = self.get_cache_key(path)
# `cache` may be True or False at this point to indicate if the
# inventory is being refreshed. Get the user's cache option too
# to see if we should save the cache when it is changing.
user_cache_setting = self.get_option("cache")
# Read if the user has caching enabled and the cache isn't being
# refreshed.
attempt_to_read_cache = user_cache_setting and cache
# Update if the user has caching enabled and the cache is being
# refreshed; update this value to True if the cache has expired below.
cache_needs_update = user_cache_setting and not cache
# Attempt to read the cache if inventory isn't being refreshed and
# the user has caching enabled.
if attempt_to_read_cache:
try:
servers = self._cache[cache_key]
except KeyError:
# This occurs if the cache_key is not in the cache or if
# the cache_key expired, so the cache needs to be updated.
servers = self._read_servers_from_API()
cache_needs_update = True
else:
servers = self._read_servers_from_API()
if cache_needs_update:
self._cache[cache_key] = servers
networks = self._read_networks_from_API()
loadbalancers = self._read_loadbalancers_from_API()
self.populate(servers, networks, loadbalancers)
def populate(self, servers, networks, loadbalancers):
display = Display()
temp_stage = self.get_option("stage_kube")
# Add a default top group 'hcloud'
self.inventory.add_group(group="hcloud")
self.inventory.add_group(group="etcd")
self.inventory.add_group(group="k8s_cluster")
loadbalancerPublicIp = "-"
loadbalancerPrivateIp = "-"
extraLoadbalancers = []
# filter all loadbalancers by naming convention -> {{ stage_kube }}-ingress
#loadbalancers = [x for x in loadbalancers if x["name"] == self.get_option("stage_kube") + "-ingress"]
pattern = rf'{temp_stage}-.*'
loadbalancers = [x for x in loadbalancers if re.match(pattern, x["name"])]
loadbalancers.sort(key=lambda x: x.get('name'))
for loadbalancer in loadbalancers:
loadbalancerId = loadbalancer["id"]
loadbalancerName = loadbalancer["name"]
loadbalancerLabels = loadbalancer["labels"]
if loadbalancerName == self.get_option("stage_kube") + "-ingress":
loadbalancerPublicIp = loadbalancer["public_net"]["ipv4"]["ip"]
if len(loadbalancer["private_net"]) > 0 :
loadbalancerPrivateIp = loadbalancer["private_net"][0]["ip"]
else:
loadbalancerPrivateIp = '-'
display.display("loadbalancer:<" + loadbalancerName + ">, publicIp=<" + loadbalancerPublicIp + ">, privateIp=<" + loadbalancerPrivateIp + ">")
else:
extraLoadbalancers.append(loadbalancer)
if len(loadbalancer["private_net"]) > 0 :
extraLoadbalancerPrivateIp = loadbalancer["private_net"][0]["ip"]
else:
extraLoadbalancerPrivateIp = '-'
display.display("loadbalancer:<" + loadbalancerName + ">, publicIp=<" + loadbalancerPublicIp + ">, privateIp=<" + extraLoadbalancerPrivateIp + ">")
# due to a hetzner api bug for label selector: only last given selector is used - label_selector=stage=XXX,!manual not working correctly
servers = [x for x in servers if 'manual' not in x["labels"]]
servers.sort(key=lambda x: x.get('name'))
for server in servers:
serverId = server["id"]
serverName = server["name"]
serverLabels = server["labels"]
serverStage = serverLabels.get("stage", "this_stage_does_not_exist")
serverService = serverLabels.get("service", "this_group_does_not_exist")
serverPublicIp = server["public_net"]["ipv4"]["ip"]
serverPrivateIp = '-'
for network in networks:
networkId = network["id"]
networkName = network["name"]
if serverId in network["servers"]:
for privateNet in server["private_net"]:
if networkId == privateNet["network"]:
serverPrivateIp = privateNet["ip"]
display.display("id: <" + str(serverId) + ">, server:<" + serverName + ">, stage=<" + serverStage + ">, service=<" + serverService + ">, publicIp=<" + serverPublicIp + ">, privateIp=<" + serverPrivateIp + ">, publicIngressLBIp=<" + loadbalancerPublicIp + ">, privateIngressLBIp=<" + loadbalancerPrivateIp + ">")
if len(serverService) > 0:
self.inventory.add_group(group=serverService)
self.inventory.add_group(group="stage_" + serverStage)
self.inventory.add_host(serverName, group="hcloud")
self.inventory.add_host(serverName, group=serverService)
self.inventory.add_host(serverName, group="stage_" + serverStage)
# should be configurable and not hard coded
if serverService == "kube_control_plane":
self.inventory.add_host(serverName, group="etcd")
if serverService == "kube_control_plane" or serverService == "kube_node":
self.inventory.add_host(serverName, group="k8s_cluster")
self.inventory.set_variable(serverName, 'stage_server_id', serverId)
self.inventory.set_variable(serverName, 'stage_server_ip', serverPublicIp)
self.inventory.set_variable(serverName, 'ansible_ssh_host', serverPublicIp)
self.inventory.set_variable(serverName, 'stage_private_server_ip', serverPrivateIp)
self.inventory.set_variable(serverName, 'stage_public_ingress_loadbalancer_ip', loadbalancerPublicIp)
self.inventory.set_variable(serverName, 'stage_private_ingress_loadbalancer_ip', loadbalancerPrivateIp)
for extraLoadbalancer in extraLoadbalancers:
self.inventory.set_variable(serverName, f'stage_public_{extraLoadbalancer["name"].replace(f"{temp_stage}-", "")}_loadbalancer_ip', extraLoadbalancer["public_net"]["ipv4"]["ip"])
if len(loadbalancer["private_net"]) > 0 :
extraLoadbalancerPrivateIp = loadbalancer["private_net"][0]["ip"]
else:
extraLoadbalancerPrivateIp = '-'
self.inventory.set_variable(serverName, f'stage_private_{extraLoadbalancer["name"].replace(f"{temp_stage}-", "")}_loadbalancer_ip', extraLoadbalancerPrivateIp)