# -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = r""" name: netgo-hcloud plugin_type: inventory author: - netgo short_description: Ansible dynamic inventory plugin for Hetzner hcloud servers. requirements: - python >= 2.10 description: - Reads inventory data from the Hetzner Cloud API. Server Groups are given by the label. Servers need two labels: service and stage. extends_documentation_fragment: - constructed - inventory_cache options: plugin: description: Marks this as an instance of the "netgo-hcloud" plugin required: true choices: ["netgo-hcloud"] api_token: description: The Hetzner Cloud API token. required: true env: - name: HETZNER_CLOUD_TOKEN stage: description: Filter servers by this stage. required: true env: - name: HETZNER_STAGE stage_kube: description: Filter servers by this stage. required: true env: - name: HETZNER_STAGE_KUBE label_selector: description: Filter servers by this label selector. required: true env: - name: HETZNER_LABEL_SELECTOR """ import json import os import re from ansible.errors import AnsibleError from ansible.module_utils.urls import open_url from ansible.module_utils._text import to_native from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable from ansible.release import __version__ from ansible.utils.display import Display class MyHcloudAPI: BASE = "https://api.hetzner.cloud" def __init__(self, token, label_selector=None): self.token = token self.label_selector = label_selector def get_values(self, api_path, response_values_field_name): display = Display() try: response_values = [] # pagination with page_size per window, repeat until last page is reached page = 1 page_size = 20 while page > 0: api_url = "" if not self.label_selector: api_url = "{}/{}?per_page={}&page={}".format(self.BASE, api_path, str(page_size), str(page)) else: api_url = "{}/{}?label_selector={}&per_page={}&page={}".format(self.BASE, api_path, self.label_selector, str(page_size), str(page)) display.display(api_url) response = open_url( api_url, headers={"Authorization": "Bearer " + self.token}, ) json_response = json.loads(response.read()) response_values += json_response[response_values_field_name] if json_response["meta"]["pagination"]["page"] == json_response["meta"]["pagination"]["last_page"]: break page += 1 return response_values except ValueError as e: raise AnsibleError("Incorrect JSON payload: " + str(e)) except Exception as e: raise AnsibleError("Error while fetching %s: %s" % (api_url, to_native(e))) def get_servers(self): return self.get_values("v1/servers", "servers") def get_networks(self): return self.get_values("v1/networks", "networks") def get_loadbalancers(self): return self.get_values("v1/load_balancers", "load_balancers") class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): NAME = "netgo-hcloud" def verify_file(self, path): return super(InventoryModule, self).verify_file(path) and path.endswith( (self.NAME + ".yaml", self.NAME + ".yml") ) def _read_servers_from_API(self): servers = MyHcloudAPI( self.get_option("api_token"), self.get_option("label_selector") ).get_servers() return servers def _read_networks_from_API(self): networks = MyHcloudAPI( self.get_option("api_token"), self.get_option("label_selector") ).get_networks() return networks def _read_loadbalancers_from_API(self): loadbalancers = MyHcloudAPI( self.get_option("api_token") ).get_loadbalancers() return loadbalancers def parse(self, inventory, loader, path, cache=True): super(InventoryModule, self).parse(inventory, loader, path, cache) config = self._read_config_data(path) cache_key = self.get_cache_key(path) # `cache` may be True or False at this point to indicate if the # inventory is being refreshed. Get the user's cache option too # to see if we should save the cache when it is changing. user_cache_setting = self.get_option("cache") # Read if the user has caching enabled and the cache isn't being # refreshed. attempt_to_read_cache = user_cache_setting and cache # Update if the user has caching enabled and the cache is being # refreshed; update this value to True if the cache has expired below. cache_needs_update = user_cache_setting and not cache # Attempt to read the cache if inventory isn't being refreshed and # the user has caching enabled. if attempt_to_read_cache: try: servers = self._cache[cache_key] except KeyError: # This occurs if the cache_key is not in the cache or if # the cache_key expired, so the cache needs to be updated. servers = self._read_servers_from_API() cache_needs_update = True else: servers = self._read_servers_from_API() if cache_needs_update: self._cache[cache_key] = servers networks = self._read_networks_from_API() loadbalancers = self._read_loadbalancers_from_API() self.populate(servers, networks, loadbalancers) def populate(self, servers, networks, loadbalancers): display = Display() temp_stage = self.get_option("stage_kube") # Add a default top group 'hcloud' self.inventory.add_group(group="hcloud") self.inventory.add_group(group="etcd") self.inventory.add_group(group="k8s_cluster") loadbalancerPublicIp = "-" loadbalancerPrivateIp = "-" extraLoadbalancers = [] # filter all loadbalancers by naming convention -> {{ stage_kube }}-ingress #loadbalancers = [x for x in loadbalancers if x["name"] == self.get_option("stage_kube") + "-ingress"] pattern = rf'{temp_stage}-.*' loadbalancers = [x for x in loadbalancers if re.match(pattern, x["name"])] loadbalancers.sort(key=lambda x: x.get('name')) for loadbalancer in loadbalancers: loadbalancerId = loadbalancer["id"] loadbalancerName = loadbalancer["name"] loadbalancerLabels = loadbalancer["labels"] if loadbalancerName == self.get_option("stage_kube") + "-ingress": loadbalancerPublicIp = loadbalancer["public_net"]["ipv4"]["ip"] if len(loadbalancer["private_net"]) > 0 : loadbalancerPrivateIp = loadbalancer["private_net"][0]["ip"] else: loadbalancerPrivateIp = '-' display.display("loadbalancer:<" + loadbalancerName + ">, publicIp=<" + loadbalancerPublicIp + ">, privateIp=<" + loadbalancerPrivateIp + ">") else: extraLoadbalancers.append(loadbalancer) if len(loadbalancer["private_net"]) > 0 : extraLoadbalancerPrivateIp = loadbalancer["private_net"][0]["ip"] else: extraLoadbalancerPrivateIp = '-' display.display("loadbalancer:<" + loadbalancerName + ">, publicIp=<" + loadbalancerPublicIp + ">, privateIp=<" + extraLoadbalancerPrivateIp + ">") # due to a hetzner api bug for label selector: only last given selector is used - label_selector=stage=XXX,!manual not working correctly servers = [x for x in servers if 'manual' not in x["labels"]] servers.sort(key=lambda x: x.get('name')) for server in servers: serverId = server["id"] serverName = server["name"] serverLabels = server["labels"] serverStage = serverLabels.get("stage", "this_stage_does_not_exist") serverService = serverLabels.get("service", "this_group_does_not_exist") serverPublicIp = server["public_net"]["ipv4"]["ip"] serverPrivateIp = '-' for network in networks: networkId = network["id"] networkName = network["name"] if serverId in network["servers"]: for privateNet in server["private_net"]: if networkId == privateNet["network"]: serverPrivateIp = privateNet["ip"] display.display("id: <" + str(serverId) + ">, server:<" + serverName + ">, stage=<" + serverStage + ">, service=<" + serverService + ">, publicIp=<" + serverPublicIp + ">, privateIp=<" + serverPrivateIp + ">, publicIngressLBIp=<" + loadbalancerPublicIp + ">, privateIngressLBIp=<" + loadbalancerPrivateIp + ">") if len(serverService) > 0: self.inventory.add_group(group=serverService) self.inventory.add_group(group="stage_" + serverStage) self.inventory.add_host(serverName, group="hcloud") self.inventory.add_host(serverName, group=serverService) self.inventory.add_host(serverName, group="stage_" + serverStage) # should be configurable and not hard coded if serverService == "kube_control_plane": self.inventory.add_host(serverName, group="etcd") if serverService == "kube_control_plane" or serverService == "kube_node": self.inventory.add_host(serverName, group="k8s_cluster") self.inventory.set_variable(serverName, 'stage_server_id', serverId) self.inventory.set_variable(serverName, 'stage_server_ip', serverPublicIp) self.inventory.set_variable(serverName, 'ansible_ssh_host', serverPublicIp) self.inventory.set_variable(serverName, 'stage_private_server_ip', serverPrivateIp) self.inventory.set_variable(serverName, 'stage_public_ingress_loadbalancer_ip', loadbalancerPublicIp) self.inventory.set_variable(serverName, 'stage_private_ingress_loadbalancer_ip', loadbalancerPrivateIp) for extraLoadbalancer in extraLoadbalancers: self.inventory.set_variable(serverName, f'stage_public_{extraLoadbalancer["name"].replace(f"{temp_stage}-", "")}_loadbalancer_ip', extraLoadbalancer["public_net"]["ipv4"]["ip"]) if len(loadbalancer["private_net"]) > 0 : extraLoadbalancerPrivateIp = loadbalancer["private_net"][0]["ip"] else: extraLoadbalancerPrivateIp = '-' self.inventory.set_variable(serverName, f'stage_private_{extraLoadbalancer["name"].replace(f"{temp_stage}-", "")}_loadbalancer_ip', extraLoadbalancerPrivateIp)