# -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = r""" name: netgo-hcloud plugin_type: inventory author: - netgo short_description: Ansible dynamic inventory plugin for Hetzner hcloud servers. requirements: - python >= 2.10 description: - Reads inventory data from the Hetzner Cloud API. extends_documentation_fragment: - constructed - inventory_cache options: plugin: description: Marks this as an instance of the "netgo-hcloud" plugin required: true choices: ["netgo-hcloud"] api_token: description: The Hetzner Cloud API token. required: true env: - name: HETZNER_CLOUD_TOKEN stage: description: The Hetzner Cloud stage. required: true env: - name: HETZNER_CLOUD_STAGE """ import json import os from ansible.errors import AnsibleError from ansible.module_utils.urls import open_url from ansible.module_utils._text import to_native from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable from ansible.release import __version__ class MyHcloudAPI: BASE = "https://api.hetzner.cloud" def __init__(self, token, stage): self.token = token self.stage = stage def get_servers(self): api_url = "%s/v1/servers?label_selector=stage=" % self.BASE + self.stage try: response = open_url( api_url, headers={"Authorization": "Bearer " + self.token}, ) result = json.loads(response.read()) return result["servers"] except ValueError: raise AnsibleError("Incorrect JSON payload") except Exception as e: raise AnsibleError("Error while fetching %s: %s" % (api_url, to_native(e))) class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): NAME = "netgo-hcloud" def verify_file(self, path): return super(InventoryModule, self).verify_file(path) and path.endswith( (self.NAME + ".yaml", self.NAME + ".yml") ) def _read_servers_from_API(self): servers = MyHcloudAPI( self.get_option("api_token"), self.get_option("stage") ).get_servers() return servers def parse(self, inventory, loader, path, cache=True): super(InventoryModule, self).parse(inventory, loader, path, cache) config = self._read_config_data(path) cache_key = self.get_cache_key(path) # `cache` may be True or False at this point to indicate if the # inventory is being refreshed. Get the user's cache option too # to see if we should save the cache when it is changing. user_cache_setting = self.get_option("cache") # Read if the user has caching enabled and the cache isn't being # refreshed. attempt_to_read_cache = user_cache_setting and cache # Update if the user has caching enabled and the cache is being # refreshed; update this value to True if the cache has expired below. cache_needs_update = user_cache_setting and not cache # Attempt to read the cache if inventory isn't being refreshed and # the user has caching enabled. if attempt_to_read_cache: try: servers = self._cache[cache_key] except KeyError: # This occurs if the cache_key is not in the cache or if # the cache_key expired, so the cache needs to be updated. servers = self._read_servers_from_API() cache_needs_update = True else: servers = self._read_servers_from_API() if cache_needs_update: self._cache[cache_key] = servers self.populate(self.get_option("stage"), servers) def populate(self, stage, servers): # Add a default top group 'hcloud' self.inventory.add_group(group="hcloud") # Add a default top group 'stage_XYZ' self.inventory.add_group(group="stage_" + stage) for server in servers: serverName = server["name"] serverLabels = server["labels"] serverService = serverLabels["service"] self.inventory.add_group(group=serverService) self.inventory.add_host(serverName, group="hcloud") self.inventory.add_host(serverName, group=serverService) self.inventory.add_host(serverName, group="stage_" + stage)