# -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = r""" name: netgo-hcloud plugin_type: inventory author: - netgo short_description: Ansible dynamic inventory plugin for Hetzner hcloud servers. requirements: - python >= 2.10 description: - Reads inventory data from the Hetzner Cloud API. Server Groups are given by the label. Servers need two labels: service and stage. extends_documentation_fragment: - constructed - inventory_cache options: plugin: description: Marks this as an instance of the "netgo-hcloud" plugin required: true choices: ["netgo-hcloud"] api_token: description: The Hetzner Cloud API token. required: true env: - name: HETZNER_CLOUD_TOKEN label_selector: description: Filter servers by this label selector. required: true env: - name: HETZNER_LABEL_SELECTOR """ import json import os from ansible.errors import AnsibleError from ansible.module_utils.urls import open_url from ansible.module_utils._text import to_native from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable from ansible.release import __version__ from ansible.utils.display import Display class MyHcloudAPI: BASE = "https://api.hetzner.cloud" def __init__(self, token, label_selector): self.token = token self.label_selector = label_selector def get_values(self, api_path, response_values_field_name): display = Display() try: response_values = [] # pagination with page_size per window, repeat until last page is reached page = 1 page_size = 20 while page > 0: api_url = "{}/{}?label_selector={}&per_page={}&page={}".format(self.BASE, api_path, self.label_selector, str(page_size), str(page)) display.display(api_url) response = open_url( api_url, headers={"Authorization": "Bearer " + self.token}, ) json_response = json.loads(response.read()) response_values += json_response[response_values_field_name] if json_response["meta"]["pagination"]["page"] == json_response["meta"]["pagination"]["last_page"]: break page += 1 return response_values except ValueError: raise AnsibleError("Incorrect JSON payload") except Exception as e: raise AnsibleError("Error while fetching %s: %s" % (api_url, to_native(e))) def get_servers(self): return self.get_values("v1/servers", "servers") def get_networks(self): return self.get_values("v1/networks", "networks") class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): NAME = "netgo-hcloud" def verify_file(self, path): return super(InventoryModule, self).verify_file(path) and path.endswith( (self.NAME + ".yaml", self.NAME + ".yml") ) def _read_servers_from_API(self): servers = MyHcloudAPI( self.get_option("api_token"), self.get_option("label_selector") ).get_servers() return servers def _read_networks_from_API(self): networks = MyHcloudAPI( self.get_option("api_token"), self.get_option("label_selector") ).get_networks() return networks def parse(self, inventory, loader, path, cache=True): super(InventoryModule, self).parse(inventory, loader, path, cache) config = self._read_config_data(path) cache_key = self.get_cache_key(path) # `cache` may be True or False at this point to indicate if the # inventory is being refreshed. Get the user's cache option too # to see if we should save the cache when it is changing. user_cache_setting = self.get_option("cache") # Read if the user has caching enabled and the cache isn't being # refreshed. attempt_to_read_cache = user_cache_setting and cache # Update if the user has caching enabled and the cache is being # refreshed; update this value to True if the cache has expired below. cache_needs_update = user_cache_setting and not cache # Attempt to read the cache if inventory isn't being refreshed and # the user has caching enabled. if attempt_to_read_cache: try: servers = self._cache[cache_key] except KeyError: # This occurs if the cache_key is not in the cache or if # the cache_key expired, so the cache needs to be updated. servers = self._read_servers_from_API() cache_needs_update = True else: servers = self._read_servers_from_API() if cache_needs_update: self._cache[cache_key] = servers networks = self._read_networks_from_API() self.populate(servers, networks) def populate(self, servers, networks): display = Display() # Add a default top group 'hcloud' self.inventory.add_group(group="hcloud") self.inventory.add_group(group="etcd") self.inventory.add_group(group="k8s_cluster") for server in servers: serverId = server["id"] serverName = server["name"] serverLabels = server["labels"] serverStage = serverLabels["stage"] serverService = serverLabels["service"] serverPublicIp = server["public_net"]["ipv4"]["ip"] serverPrivateIp = '-' for network in networks: networkId = network["id"] networkName = network["name"] if serverId in network["servers"]: for privateNet in server["private_net"]: if networkId == privateNet["network"]: serverPrivateIp = privateNet["ip"] display.display("server:<" + serverName + ">, stage=<" + serverStage + ">, service=<" + serverService + ">, publicIp=<" + serverPublicIp + ">, privateIp=<" + serverPrivateIp + ">") self.inventory.add_group(group=serverService) self.inventory.add_group(group="stage_" + serverStage) self.inventory.add_host(serverName, group="hcloud") self.inventory.add_host(serverName, group=serverService) self.inventory.add_host(serverName, group="stage_" + serverStage) # should be configurable and not hard coded if serverService == "kube_control_plane": self.inventory.add_host(serverName, group="etcd") if serverService == "kube_control_plane" or serverService == "kube_node": self.inventory.add_host(serverName, group="k8s_cluster") self.inventory.set_variable(serverName, 'stage_server_ip', serverPublicIp) self.inventory.set_variable(serverName, 'ansible_ssh_host', serverPublicIp) self.inventory.set_variable(serverName, 'stage_private_server_ip', serverPrivateIp)