title: Investigate adding support for a second cloud platform state: OPEN author: dboreham labels: comments: 0 assignees: projects: milestone: number: 43 -- Currently `machine` only supports DigitalOcean as a hosting provider, although the tool was written with multi-provider use cases in mind. Investigate what it would take to add support for a second provider. The most promising second provider is: Vultr since they appear to have "feature parity" with DigitalOcean, a similar pricing structure, and an API.
{
"subagent_type": "Explore",
"prompt": "Explore this codebase thoroughly to understand how the DigitalOcean provider is integrated and what abstractions exist (or don't) for supporting multiple cloud providers. Look at:\n\n1. The main entry point and how DigitalOcean is used\n2. factory.py - how VMs are created\n3. di.py - dependency injection / globals\n4. config.py - configuration loading\n5. cloud_config.py - cloud-init config\n6. All subcommands in machine/subcommands/\n7. Any existing provider abstraction or interface\n\nI want to understand:\n- How tightly coupled is the code to DigitalOcean's API?\n- What abstractions already exist that could support multiple providers?\n- What would need to change to add Vultr support?\n- How is the config structured?\n\nBe very thorough - read all relevant files."
}Excellent! Now let me create a comprehensive summary document. Based on my thorough exploration, I have a complete understanding of the codebase. Let me organize all findings:
## Comprehensive Analysis: DigitalOcean Integration and Provider Abstraction in `machine` CLI
### 1. TIGHT COUPLING ASSESSMENT
The code is **deeply and directly coupled to DigitalOcean's API** through the `python-digitalocean` library (version 1.17.0). There is **no existing abstraction layer** for multiple cloud providers. Here's the extent of coupling:
#### Import Statements (Direct Dependencies)
- `digitalocean.Manager` - used in 9 subcommands (create, destroy, list, list-domain, ssh_keys, domains, projects, status)
- `digitalocean.Droplet` - VM object model in create.py
- `digitalocean.Domain` - DNS management in create.py and destroy.py
- `digitalocean.Project` - Project assignment in create.py
- `digitalocean.SSHKey` - SSH key retrieval in create.py
- `digitalocean.NotFoundError` - Exception handling in create.py and destroy.py
#### Hardcoded DigitalOcean Specifics
**In `/home/david/projects/stirlingbridge/machine/machine/subcommands/create.py` (lines 14-36):**
- Hardcoded valid regions list: `["NYC1", "NYC3", "AMS3", "SFO2", "SFO3", "SGP1", "LON1", "FRA1", "TOR1", "BLR1", "SYD1"]`
- Hardcoded valid images list: `["almalinux-8-x64", "almalinux-9-x64", "centos-stream-9-x64", ...]`
- These are DigitalOcean-specific image slugs and regions
**In `/home/david/projects/stirlingbridge/machine/machine/config.py` (lines 60-71):**
```python
if "digital-ocean" not in config:
fatal_error("Required 'digital-ocean' section not found in config file")
config_do = config["digital-ocean"]
return Config(
_require_key(config_do, "access-token", "digital-ocean"),
_require_key(config_do, "ssh-key", "digital-ocean"),
...
)
```
Configuration is hardcoded to expect `digital-ocean` section only. No provider selection mechanism.
**Error Messages:**
- cloud_config.py line 18: "SSH key '...' not found in **DigitalOcean**" (explicitly names provider)
- create.py line 76: "SSH key '...' not found in **DigitalOcean**"
#### DigitalOcean-Specific Logic
**In `/home/david/projects/stirlingbridge/machine/machine/util.py`:**
- `projectFromName()` - queries DigitalOcean projects via `manager.get_all_projects()`
- `sshKeyFromName()` - queries DigitalOcean SSH keys via `manager.get_all_sshkeys()`
- `dnsRecordIdFromName()` - queries DigitalOcean domain records
- Helpers work directly with DO object attributes (e.g., `droplet.region["slug"]`, `droplet.ip_address`, `domain.get_records()`)
**DNS Implementation:**
- Creates A records specifically (line 139 in create.py: `type="A"`)
- Uses DigitalOcean Domain API exclusively
- 5-minute TTL hardcoded (line 139: `ttl=60 * 5`)
**Droplet/VM Model:**
- `digitalocean.Droplet` construction has DO-specific parameters:
- `region` - expects DO region slug
- `image` - expects DO image slug
- `size_slug` - expects DO size slug
- `ssh_keys` - expects DO SSHKey objects
- `tags` - DigitalOcean feature for resource organization
- `backups=False` - DigitalOcean-specific feature
---
### 2. EXISTING ABSTRACTIONS (OR LACK THEREOF)
#### What EXISTS (Limited):
1. **Cloud-init Config Generation** (`cloud_config.py`):
- `get_user_data()` generates cloud-init scripts
- **Provider-agnostic** - cloud-init is a standard and works across all cloud providers
- Generates YAML that can be used by any provider supporting cloud-init
- **This is the one truly portable component**
2. **Session/Tagging System** (`types.py`, `util.py`):
- `TAG_MACHINE_CREATED`, `TAG_MACHINE_SESSION_PREFIX`, `TAG_MACHINE_TYPE_PREFIX`
- Uses tags to identify machines
- **Problem**: Tags are stored as a list, relying on DigitalOcean's tagging feature
- Would need to be rethought for providers without native tagging
3. **Data Structures** (`types.py`):
```python
@dataclass
class Config:
access_token: str # DO-specific
ssh_key: str # DO-specific
dns_zone: str
machine_size: str # DO size_slug format
image: str # DO image slug
region: str # DO region slug
project: str # DO-specific (project assignment)
@dataclass
class MachineConfig:
new_user_name: str
script_url: str
script_dir: str
script_path: str
script_args: str
```
The `Config` class bundles DO-specific fields with generic ones.
#### What DOES NOT EXIST:
1. **No Provider Interface/Abstract Base Class** - No abstraction like:
```python
class CloudProvider(ABC):
@abstractmethod
def create_vm(self, params): pass
@abstractmethod
def destroy_vm(self, vm_id): pass
@abstractmethod
def list_vms(self): pass
```
2. **No Provider Factory** - Hard decision between provider happens at config load time, not through a factory pattern
3. **No Capability Matrix** - No mapping of:
- Which regions each provider supports
- Which images each provider offers
- Which machine sizes each provider has
- Whether provider supports native tagging, DNS, projects, etc.
4. **No Adapter Pattern** - Commands directly use `digitalocean.*` classes without wrapping them in provider-agnostic adapters
---
### 3. CONFIGURATION STRUCTURE
**Location:** `~/.machine/config.yml`
**Current Structure:**
```yaml
digital-ocean:
access-token: dop_v1_... # Secrets
ssh-key: my-ssh-key-name # Name in DO account
dns-zone: example.com # Optional
machine-size: s-4vcpu-8gb # DO-specific size slug
image: ubuntu-22-04-x64 # DO-specific image slug
region: nyc3 # DO-specific region
project: Infrastructure # DO project name (optional)
machines:
example: # Machine type
new-user-name: alice # User for cloud-init
script-dir: /opt/setup-scripts
script-url: https://...
script-path: /opt/setup-scripts/setup.sh
script-args: "-y"
```
**Key Characteristics:**
- Provider name hardcoded in config section (`digital-ocean`)
- All top-level config values are provider-specific
- Config loading in `config.py` has no abstraction - directly maps to `Config` dataclass
- Environment variable substitution uses `${VAR}` syntax and is implemented at parse time
**Processing:**
```python
# config.py line 38-49
config = _load_config_data(config_file_name) # Loads YAML
config = _expand_env_vars(config) # Expands ${VAR} in place
# Then directly accesses config["digital-ocean"]
```
---
### 4. COMMAND ARCHITECTURE
Each subcommand follows a similar pattern (exemplified by `create.py`):
```python
@click.command()
@click.option(...)
@click.pass_context
def command(context, ...):
command_context: MainCmdCtx = context.obj # From main.py
config = command_context.config # Type: Config
# Direct use of python-digitalocean:
manager = digitalocean.Manager(token=config.access_token)
# Droplet manipulation:
droplet = digitalocean.Droplet(
token=config.access_token,
name=name,
region=config.region,
image=config.image,
size_slug=config.machine_size,
...
)
droplet.create()
```
**Subcommands and their DigitalOcean usage:**
| Subcommand | DO Classes Used | Key Operations |
|---|---|---|
| `create` | Manager, Droplet, Domain, Project, SSHKey | Create VM, DNS record, project assignment |
| `destroy` | Manager, Droplet, Domain | Destroy VM, delete DNS record |
| `list` | Manager (get_all_droplets) | Query VMs, tag-based filtering |
| `status` | Droplet (via list), HTTP (custom) | Query VM status + custom endpoint |
| `ssh-keys` | Manager | List SSH keys |
| `domains` | Manager | List DNS domains |
| `list-domain` | Domain, Manager | List DNS records, correlate with VMs |
| `projects` | Manager | List projects |
| `types` | None (only config) | List machine types from config |
---
### 5. DEPENDENCY INJECTION
**Location:** `/home/david/projects/stirlingbridge/machine/machine/di.py`
Very minimal - just a holder for CLI options:
```python
class d:
opt: CliOptions = None
```
This is set in `main.py` line 28:
```python
d.opt = options
```
Used by subcommands to check debug, quiet, verbose, dry_run flags. No abstraction layer injected here.
---
### 6. KEY FILES AND THEIR ROLES
| File | Purpose | DigitalOcean Coupling |
|---|---|---|
| `main.py` | Click group, config loading, subcommand registration | Minimal - just orchestration |
| `config.py` | YAML parsing, env var expansion, validation | **HIGH** - hardcoded "digital-ocean" section |
| `di.py` | Minimal DI for CLI options | None |
| `types.py` | Data structures (Config, MachineConfig, Tags) | **MEDIUM** - Config contains DO fields, tags rely on DO |
| `cloud_config.py` | Cloud-init generation | **LOW** - Actually provider-agnostic |
| `util.py` | Helper functions for DO API | **CRITICAL** - All helpers work with DO objects |
| `factory.py` | YAML factory function | None - just creates YAML reader |
| All subcommands | Entry points for CLI actions | **CRITICAL** - Direct DO API usage throughout |
---
### 7. WHAT WOULD NEED TO CHANGE FOR VULTR SUPPORT
To add Vultr support alongside DigitalOcean would require:
#### 1. **Configuration Layer**
- Extend config.py to support `vultr:` section alongside `digital-ocean:`
- Create provider detection/selection logic
- Map provider-specific parameters (Vultr uses different region/size/image formats)
- Example: Vultr uses "VULTR_REGION_EMDOSTROM" while DO uses "nyc3"
#### 2. **Provider Abstraction (NEW)**
Create a provider interface:
```python
# machine/providers/base.py
from abc import ABC, abstractmethod
class CloudProvider(ABC):
@abstractmethod
def create_vm(self, name, size, image, region, user_data, tags) -> str: pass
@abstractmethod
def destroy_vm(self, vm_id) -> bool: pass
@abstractmethod
def list_vms(self, tag=None) -> List[VM]: pass
@abstractmethod
def get_ssh_key(self, name) -> SSHKey: pass
@abstractmethod
def create_dns_record(self, zone, name, ip) -> DNSRecord: pass
@abstractmethod
def delete_dns_record(self, zone, record_id) -> bool: pass
# ... etc
# machine/providers/digitalocean.py
class DigitalOceanProvider(CloudProvider):
def __init__(self, token: str):
self.manager = digitalocean.Manager(token=token)
def create_vm(self, ...): pass
# ... implementations
# machine/providers/vultr.py
class VultrProvider(CloudProvider):
def __init__(self, api_key: str):
self.client = vultr.VultrClient(api_key)
def create_vm(self, ...): pass
# ... implementations
```
#### 3. **Provider Factory**
```python
# machine/provider_factory.py
class ProviderFactory:
@staticmethod
def create(config: Config) -> CloudProvider:
if "digital-ocean" in config:
return DigitalOceanProvider(config.access_token)
elif "vultr" in config:
return VultrProvider(config.api_key)
else:
raise ValueError("No provider configured")
```
#### 4. **Config Structure Changes**
```yaml
# Support both (or select one):
digital-ocean:
access-token: ...
# DO-specific settings
vultr:
api-key: ...
# Vultr-specific settings
# Provider-agnostic settings:
defaults:
machine-size: c1
image: ubuntu-22-04-x64
region: ewr
ssh-key: my-key
machines:
# ... unchanged
```
#### 5. **Commands Refactoring**
Every subcommand would need refactoring from:
```python
# Current (tightly coupled):
manager = digitalocean.Manager(token=config.access_token)
droplets = manager.get_all_droplets()
# To (provider-agnostic):
provider = get_provider(config)
vms = provider.list_vms()
```
#### 6. **Type System Updates**
- Create vendor-neutral types: `VM`, `DNSRecord`, `SSHKey` instead of DO's `Droplet`, `Domain`, `Record`
- These would wrap provider-specific objects or be translated from them
- Add region/image/size mapping layer since each provider uses different names
#### 7. **Capability Detection**
Some operations wouldn't work uniformly:
- **Tagging**: DO has native tags, Vultr may not - would need workaround (metadata API, naming convention)
- **Projects**: DO has projects, Vultr doesn't - need feature detection
- **DNS Management**: Both support it but API differs
- **Status Checks**: Custom endpoint at `:4242/cgi-bin/cloud-init-status` would still work (cloud-init is universal)
---
### 8. MIGRATION COMPLEXITY SUMMARY
| Area | Complexity | Notes |
|---|---|---|
| Configuration | **HIGH** | Provider detection, parameter mapping |
| Subcommands | **VERY HIGH** | All 9 subcommands need refactoring |
| Provider Abstraction | **HIGH** | Create base class, 2 implementations (DO, Vultr) |
| Type System | **MEDIUM** | Map provider models to vendor-neutral types |
| Tagging/Session Management | **HIGH** | DO tags are native; Vultr alternative needed |
| DNS Management | **MEDIUM** | Different API but both support it |
| Testing | **HIGH** | Would need mocks/adapters for both providers |
| **Overall Effort** | **VERY HIGH** | Estimated: 2-3 weeks for experienced developer |
---
### 9. CURRENT ARCHITECTURE DIAGRAM
```
main.py (Click group)
↓
config.py (loads config.yml)
↓ hardcoded to expect "digital-ocean" section
types.py (Config, MachineConfig - with DO fields)
↓
subcommands/* (create, destroy, list, etc.)
↓ each imports and uses directly
python-digitalocean library
↓
DigitalOcean API
util.py (helpers)
↓ works with DO objects
cloud_config.py (generates cloud-init - provider-agnostic!)
↓
cloud-init (on spawned VMs)
```
**Missing:** Any abstraction between subcommands and `python-digitalocean`. Direct dependency.
---
### 10. WHAT'S PROVIDER-AGNOSTIC
- **cloud_config.py** - Cloud-init script generation works with any provider
- **cloud-init execution** - Standard across all major cloud providers
- **Session management logic** - Tag-based filtering approach (though needs adaptation for Vultr)
- **Machine type definitions** - Config-driven machine profiles
- **CLI interface** - Click framework is provider-agnostic
- **Custom status checks** - The HTTP endpoint polling would work for any provider
---
### Files Summary
**All files read during analysis:**
1. `/home/david/projects/stirlingbridge/machine/machine/main.py` - Entry point (58 lines)
2. `/home/david/projects/stirlingbridge/machine/machine/di.py` - Minimal DI (8 lines)
3. `/home/david/projects/stirlingbridge/machine/machine/config.py` - **HIGH coupling** (104 lines)
4. `/home/david/projects/stirlingbridge/machine/machine/factory.py` - YAML factory (9 lines)
5. `/home/david/projects/stirlingbridge/machine/machine/types.py` - **MEDIUM coupling** (40 lines)
6. `/home/david/projects/stirlingbridge/machine/machine/cloud_config.py` - **LOW coupling** (48 lines)
7. `/home/david/projects/stirlingbridge/machine/machine/util.py` - **CRITICAL coupling** (89 lines)
8. `/home/david/projects/stirlingbridge/machine/machine/constants.py` - Constants (6 lines)
9. `/home/david/projects/stirlingbridge/machine/machine/defaults.py` - Empty (3 lines)
10. `/home/david/projects/stirlingbridge/machine/machine/log.py` - Logging (19 lines)
11. Subcommands (create, destroy, list, status, ssh_keys, domains, projects, list_domain, types) - **ALL HIGH coupling**
12. Test files (config, input_validation) - Show validation and env expansion
13. `pyproject.toml` - Dependencies (56 lines)
14. `README.md` - Documentation with config reference (328 lines)
**Total codebase size:** ~900 lines of actual code + ~330 lines of config/structureagentId: ae1b0ce74900bf624 (for resuming to continue this agent's work if needed) <usage>total_tokens: 44145 tool_uses: 27 duration_ms: 103158</usage>
Here's a summary of what it would take to add Vultr support (issue #43):
The codebase is tightly coupled to DigitalOcean — there's no provider abstraction layer. Every subcommand directly imports and uses python-digitalocean classes (Manager, Droplet, Domain, etc.). The config loader is hardcoded to expect a digital-ocean section.
One bright spot: cloud_config.py is already provider-agnostic since cloud-init is a cross-provider standard.
Create a CloudProvider ABC with methods like create_vm(), destroy_vm(), list_vms(), get_ssh_key(), create_dns_record(), etc. Then implement DigitalOceanProvider and VultrProvider.
config.py, types.py)vultr: section alongside digital-ocean:Every subcommand (create, destroy, list, status, ssh-keys, domains, list-domain, projects, types) currently calls DO APIs directly and would need refactoring to use the provider abstraction.
util.py)All helpers (projectFromName, sshKeyFromName, dnsRecordIdFromName) work directly with DO objects and would need provider-neutral equivalents.
DO has native tags used for session tracking. Vultr may need an alternative mechanism (metadata, naming conventions).
Hardcoded DO-specific lists in create.py (e.g., ["NYC1", "NYC3", ...], ["ubuntu-22-04-x64", ...]) would need per-provider equivalents.
Vultr is a reasonable choice — it supports cloud-init, has a REST API, offers DNS management, and has similar pricing. The vultr Python library exists. Key differences to handle: - Different auth (API key vs DO access token) - Different resource naming (instances vs droplets, plans vs size slugs) - No native "projects" concept - Different region/image identifiers
CloudProvider interface, with DigitalOceanProvider as the sole implementationVultrProvider as a second implementationThis is a significant refactor (~900 lines of code, all subcommands affected) but the codebase is small enough that it's very manageable.
1→from dataclasses import dataclass
2→
3→TAG_MACHINE_CREATED = "machine:created"
4→TAG_MACHINE_TYPE_PREFIX = "machine:type:"
5→TAG_MACHINE_SESSION_PREFIX = "machine:session:"
6→
7→
8→@dataclass
9→class CliOptions:
10→ debug: bool
11→ quiet: bool
12→ verbose: bool
13→ dry_run: bool
14→
15→
16→@dataclass
17→class Config:
18→ access_token: str
19→ ssh_key: str
20→ dns_zone: str
21→ machine_size: str
22→ image: str
23→ region: str
24→ project: str
25→
26→
27→@dataclass
28→class MainCmdCtx:
29→ config: Config
30→ session_id: str
31→
32→
33→@dataclass
34→class MachineConfig:
35→ new_user_name: str
36→ script_url: str
37→ script_dir: str
38→ script_path: str
39→ script_args: str
40→ 1→import os
2→import re
3→from pathlib import Path
4→from machine.di import d
5→from machine.factory import yaml
6→from machine.log import fatal_error, debug
7→from machine.types import Config, MachineConfig
8→from machine import constants
9→
10→_env_var_pattern = re.compile(r"\$\{([^}]+)\}")
11→
12→
13→def _expand_env_vars(value):
14→ if isinstance(value, str):
15→
16→ def _replace(match):
17→ expr = match.group(1)
18→ if ":-" in expr:
19→ var_name, default = expr.split(":-", 1)
20→ return os.environ.get(var_name, default)
21→ else:
22→ if expr not in os.environ:
23→ fatal_error(f"Environment variable '{expr}' referenced in config is not set")
24→ return os.environ[expr]
25→
26→ return _env_var_pattern.sub(_replace, value)
27→ elif isinstance(value, dict):
28→ return {k: _expand_env_vars(v) for k, v in value.items()}
29→ elif isinstance(value, list):
30→ return [_expand_env_vars(item) for item in value]
31→ return value
32→
33→
34→class _loaded_config:
35→ c: any = None
36→
37→
38→def _load_config_data(config_file_name: str):
39→ if not config_file_name:
40→ config_file_name = constants.default_config_file_path
41→ config_path = Path(os.path.expanduser(config_file_name))
42→ if not config_path.exists():
43→ fatal_error(f"Error: Config file: {config_path} not found")
44→ config = _expand_env_vars(yaml().load(open(config_path, "r")))
45→ if d.opt.debug:
46→ debug(f"Loaded config file: {config_path}")
47→ debug(f"Parsed config: {config}")
48→ _loaded_config.c = config
49→ return config
50→
51→
52→def _require_key(d, key, section_name):
53→ if key not in d:
54→ fatal_error(f"Required key '{key}' not found in '{section_name}' section of config file")
55→ return d[key]
56→
57→
58→def get(config_file_name: str) -> Config:
59→ config = _load_config_data(config_file_name)
60→ if "digital-ocean" not in config:
61→ fatal_error("Required 'digital-ocean' section not found in config file")
62→ config_do = config["digital-ocean"]
63→ return Config(
64→ _require_key(config_do, "access-token", "digital-ocean"),
65→ _require_key(config_do, "ssh-key", "digital-ocean"),
66→ config_do.get("dns-zone"),
67→ _require_key(config_do, "machine-size", "digital-ocean"),
68→ _require_key(config_do, "image", "digital-ocean"),
69→ _require_key(config_do, "region", "digital-ocean"),
70→ _require_key(config_do, "project", "digital-ocean"),
71→ )
72→
73→
74→def get_machine(name: str) -> MachineConfig:
75→ if not _loaded_config.c:
76→ fatal_error("Attempt to fetch machine data before config loaded")
77→ config = _loaded_config.c
78→ if "machines" not in config:
79→ fatal_error("Required 'machines' section not found in config file")
80→ config_machines = config["machines"]
81→ if name not in config_machines:
82→ fatal_error(f"Machine type '{name}' not found in config file. Available types: {', '.join(config_machines.keys())}")
83→ target_config = config_machines[name]
84→ return MachineConfig(
85→ _require_key(target_config, "new-user-name", f"machines.{name}"),
86→ target_config.get("script-url"),
87→ target_config.get("script-dir"),
88→ target_config.get("script-path"),
89→ target_config.get("script-args"),
90→ )
91→
92→
93→def get_machines():
94→ if not _loaded_config.c:
95→ fatal_error("Attempt to fetch machine data before config loaded")
96→ config = _loaded_config.c
97→
98→ if "machines" not in config:
99→ fatal_error("Required 'machines' section not found in config file")
100→ ret = {}
101→ for name in config["machines"]:
102→ ret[name] = get_machine(name)
103→ return ret
104→ 1→import os
2→import uuid
3→
4→from digitalocean import Domain, Manager, Project, SSHKey
5→
6→from machine.factory import yaml
7→from machine.constants import default_config_dir_path, default_session_id_file_path
8→from machine.types import TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_CREATED
9→
10→from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX
11→
12→
13→def projectFromName(manager: Manager, name: str) -> Project:
14→ projects = manager.get_all_projects()
15→ for project in projects:
16→ if project.name == name:
17→ return project
18→ return None
19→
20→
21→def sshKeyFromName(manager: Manager, name: str) -> SSHKey:
22→ keys = manager.get_all_sshkeys()
23→ for key in keys:
24→ if key.name == name:
25→ return key
26→ return None
27→
28→
29→def dnsRecordIdFromName(domain: Domain, name: str) -> str:
30→ records = domain.get_records()
31→ for record in records:
32→ if record.name == name:
33→ return record.id
34→ return None
35→
36→
37→def get_machine_type(droplet):
38→ type = next((t for t in droplet.tags if TAG_MACHINE_TYPE_PREFIX in t), "").replace(TAG_MACHINE_TYPE_PREFIX, "")
39→ if not type:
40→ return None
41→ return type
42→
43→
44→def is_machine_created(droplet):
45→ return TAG_MACHINE_CREATED in droplet.tags
46→
47→
48→def is_same_session(command_context: MainCmdCtx, droplet):
49→ return TAG_MACHINE_SESSION_PREFIX + command_context.session_id in droplet.tags
50→
51→
52→def load_session_id():
53→ if not os.path.exists(default_config_dir_path):
54→ os.mkdir(default_config_dir_path)
55→
56→ if not os.path.exists(default_session_id_file_path):
57→ with open(default_session_id_file_path, "w") as f:
58→ f.write("id: " + str(uuid.uuid4()).replace("-", "")[0:8])
59→
60→ sessionid_config = yaml().load(open(default_session_id_file_path, "r"))
61→ return sessionid_config["id"]
62→
63→
64→def droplet_to_json_obj(droplet):
65→ return {
66→ "id": droplet.id,
67→ "name": droplet.name,
68→ "tags": droplet.tags,
69→ "region": droplet.region["slug"] if droplet.region else None,
70→ "ip": droplet.ip_address,
71→ "type": get_machine_type(droplet),
72→ }
73→
74→
75→def dns_record_to_json_obj(dns_record, zone, droplet):
76→ if droplet:
77→ droplet = droplet_to_json_obj(droplet)
78→
79→ return {
80→ "id": dns_record.id,
81→ "droplet": droplet,
82→ "name": dns_record.name,
83→ "fqdn": f"{dns_record.name}.{zone}",
84→ "zone": zone,
85→ "data": dns_record.data,
86→ "ttl": dns_record.ttl,
87→ "type": dns_record.type,
88→ }
89→ 1→import os
2→
3→import click
4→
5→from machine import config
6→from machine import constants
7→from machine.di import d
8→from machine.log import output
9→from machine.subcommands import create, destroy, list, projects, ssh_keys, domains, list_domain, types, status
10→from machine.types import CliOptions, MainCmdCtx
11→from machine.util import load_session_id
12→
13→CLICK_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
14→
15→
16→@click.group(context_settings=CLICK_CONTEXT_SETTINGS)
17→@click.option("--debug", is_flag=True, default=False, help="Enable debug output")
18→@click.option("--quiet", is_flag=True, default=False, help="Suppress all non-essential output")
19→@click.option("--verbose", is_flag=True, default=False, help="Enable verbose output")
20→@click.option("--dry-run", is_flag=True, default=False, help="Run but do not do anything")
21→@click.option("--config-file", metavar="<PATH>", help=f"Specify the config file (default {constants.default_config_file_path})")
22→@click.option(
23→ "--session-id", metavar="<ID>", default=load_session_id(), help=f"Override the default session ID (default {load_session_id()})"
24→)
25→@click.pass_context
26→def main(context, debug, quiet, verbose, dry_run, config_file, session_id):
27→ options = CliOptions(debug, quiet, verbose, dry_run)
28→ d.opt = options
29→ # Skip config loading for version subcommand since it doesn't need it
30→ # and should work even when no config file exists (#25)
31→ if context.invoked_subcommand == "version":
32→ return
33→ main_context = MainCmdCtx(config.get(config_file), session_id)
34→ context.obj = main_context
35→
36→
37→@main.command()
38→@click.pass_context
39→def version(context):
40→ try:
41→ version_file = os.path.join(os.path.dirname(__file__), "version.txt")
42→ with open(version_file) as f:
43→ version_string = f.read().strip()
44→ except FileNotFoundError:
45→ version_string = "dev"
46→ output(version_string)
47→
48→
49→main.add_command(create.command, "create")
50→main.add_command(destroy.command, "destroy")
51→main.add_command(domains.command, "domains")
52→main.add_command(list.command, "list")
53→main.add_command(list_domain.command, "list-domain")
54→main.add_command(projects.command, "projects")
55→main.add_command(ssh_keys.command, "ssh-keys")
56→main.add_command(types.command, "types")
57→main.add_command(status.command, "status")
58→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/cloud_config.py"
} 1→from expandvars import expand
2→
3→from machine.log import fatal_error
4→from machine.types import MachineConfig
5→from machine.util import Manager, sshKeyFromName
6→
7→
8→def get_user_data(manager: Manager, ssh_key_name: str, fqdn: str, machine_config: MachineConfig):
9→ if not fqdn:
10→ fqdn = ""
11→
12→ script_args = machine_config.script_args
13→ if not script_args:
14→ script_args = ""
15→
16→ ssh_key = sshKeyFromName(manager, ssh_key_name)
17→ if not ssh_key:
18→ fatal_error(f"Error: SSH key '{ssh_key_name}' not found in DigitalOcean")
19→ ssh_public_key = ssh_key.public_key
20→ escaped_args = script_args.replace('"', '\\"')
21→
22→ cloud_env = {
23→ "MACHINE_SCRIPT_URL": machine_config.script_url,
24→ "MACHINE_SCRIPT_DIR": machine_config.script_dir,
25→ "MACHINE_FQDN": fqdn,
26→ }
27→
28→ # Exand here because otherwise escaping the vars properly for nested scripts is a guessing game
29→ escaped_args = expand(escaped_args, environ=cloud_env)
30→ cloud_config = f"""#cloud-config
31→users:
32→ - name: {machine_config.new_user_name}
33→ groups: sudo
34→ shell: /bin/bash
35→ sudo: ['ALL=(ALL) NOPASSWD:ALL']
36→ ssh-authorized-keys:
37→ - {ssh_public_key}
38→"""
39→ if machine_config.script_url and machine_config.script_dir and machine_config.script_path:
40→ cloud_config += f"""
41→runcmd:
42→ - mkdir -p {machine_config.script_dir}
43→ - curl -L {machine_config.script_url} -o {machine_config.script_path}
44→ - chmod +x {machine_config.script_path}
45→ - [su, -c, "env {" ".join([f"{k}='{v}'" for k, v in cloud_env.items()])} {machine_config.script_path} {escaped_args}", -, {machine_config.new_user_name}]
46→"""
47→ return cloud_config
48→ 1→# Exposing the truth that dependency injection is just a fancy name for global variables
2→
3→from machine.types import CliOptions
4→
5→
6→class d:
7→ opt: CliOptions = None
8→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/create.py"
} 1→import click
2→import digitalocean
3→import time
4→from machine.config import get_machine
5→from machine.di import d
6→from machine.log import fatal_error, info, debug, output
7→from machine.types import MainCmdCtx, TAG_MACHINE_CREATED, TAG_MACHINE_TYPE_PREFIX
8→from machine.util import projectFromName, sshKeyFromName
9→from machine.cloud_config import get_user_data
10→
11→from machine.types import TAG_MACHINE_SESSION_PREFIX
12→
13→
14→def _validate_region(region: str):
15→ valid_regions = ["NYC1", "NYC3", "AMS3", "SFO2", "SFO3", "SGP1", "LON1", "FRA1", "TOR1", "BLR1", "SYD1"]
16→ if region is not None and region.upper() not in valid_regions:
17→ fatal_error(f"Error: region {region} is not one of {valid_regions}")
18→
19→
20→def _validate_image(image: str):
21→ valid_images = [
22→ "almalinux-8-x64",
23→ "almalinux-9-x64",
24→ "centos-stream-9-x64",
25→ "debian-11-x64",
26→ "debian-12-x64",
27→ "fedora-39-x64",
28→ "fedora-40-x64",
29→ "rockylinux-9-x64",
30→ "rockylinux-8-x64",
31→ "ubuntu-20-04-x64",
32→ "ubuntu-22-04-x64",
33→ "ubuntu-24-04-x64",
34→ ]
35→ if image is not None and image not in valid_images:
36→ info(f"Warning: image {image} is not one of these known valid images: {valid_images}")
37→
38→
39→@click.command(help="Create a machine")
40→@click.option("--name", "-n", required=True, metavar="<MACHINE-NAME>", help="Name for new machine")
41→@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="tag to be applied to new machine")
42→@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="create a machine of this type")
43→@click.option("--region", "-r", metavar="<REGION-CODE>", help="create a machine in this region (overrides default from config)")
44→@click.option(
45→ "--machine-size", "-s", metavar="<MACHINE-SLUG>", help="create a machine of this size (overrides default from config)"
46→)
47→@click.option("--image", "-s", metavar="<IMAGE-NAME>", help="create a machine from this image (overrides default from config)")
48→@click.option("--wait-for-ip/--no-wait-for-up", default=False)
49→@click.option("--update-dns/--no-update-dns", default=True)
50→@click.option("--initialize/--no-initialize", default=True)
51→@click.pass_context
52→def command(context, name, tag, type, region, machine_size, image, wait_for_ip, update_dns, initialize):
53→ command_context: MainCmdCtx = context.obj
54→ config = command_context.config
55→
56→ if update_dns and not config.dns_zone:
57→ fatal_error("Error: DNS update requested but no zone configured")
58→
59→ manager = digitalocean.Manager(token=command_context.config.access_token)
60→
61→ user_data = None
62→ if initialize:
63→ if not type:
64→ fatal_error("Error: a machine type must be supplied")
65→ machine_config = get_machine(type)
66→ if not machine_config:
67→ fatal_error(f"Error: machine type {type} is not defined")
68→ fqdn = f"{name}.{config.dns_zone}" if config.dns_zone else None
69→ user_data = get_user_data(manager, config.ssh_key, fqdn, machine_config)
70→ if d.opt.debug:
71→ info("user-data is:")
72→ info(user_data)
73→
74→ ssh_key = sshKeyFromName(manager, config.ssh_key)
75→ if not ssh_key:
76→ fatal_error(f"Error: SSH key '{config.ssh_key}' not found in DigitalOcean")
77→
78→ _validate_region(region)
79→ _validate_image(image)
80→
81→ tags = [
82→ TAG_MACHINE_SESSION_PREFIX + command_context.session_id,
83→ TAG_MACHINE_CREATED,
84→ ]
85→ if type:
86→ tags.append(TAG_MACHINE_TYPE_PREFIX + type.lower())
87→ if tag:
88→ tags.append(tag)
89→
90→ droplet = digitalocean.Droplet(
91→ token=config.access_token,
92→ name=name,
93→ region=region if region is not None else config.region,
94→ image=image if image is not None else config.image,
95→ size_slug=machine_size if machine_size is not None else config.machine_size,
96→ ssh_keys=[ssh_key],
97→ tags=tags,
98→ user_data=user_data,
99→ backups=False,
100→ )
101→ # Create the droplet
102→ # This call returns nothing, it modifies the droplet object
103→ droplet.create()
104→ if droplet.id:
105→ if d.opt.quiet:
106→ output(f"{droplet.id}")
107→ else:
108→ output(f"New droplet created with id: {droplet.id}")
109→ # If requested, assign to a specified project
110→ if config.project:
111→ project_name = config.project
112→ project = projectFromName(manager, project_name)
113→ if not project:
114→ fatal_error(f"Error: Project {project_name} does not exist, machine created but not assigned to project")
115→ project.assign_resource([f"do:droplet:{droplet.id}"])
116→ if d.opt.verbose:
117→ info(f"Assigned droplet to project: {project}")
118→ # If requested, or if we are going to set a DNS record get the droplet's IPv4 address
119→ if wait_for_ip or update_dns:
120→ ip_address = None
121→ while not ip_address:
122→ time.sleep(1)
123→ droplet.load()
124→ ip_address = droplet.ip_address
125→ if d.opt.verbose:
126→ output("Waiting for droplet IP address")
127→ if d.opt.quiet:
128→ info(f"{ip_address}")
129→ else:
130→ info(f"IP Address: {ip_address}")
131→ # If requested, and we have the IP address, create a DNS host record for the droplet
132→ if update_dns and ip_address and config.dns_zone:
133→ zone = config.dns_zone
134→ host = name
135→ if d.opt.debug:
136→ debug(f"Setting host record {host}.{zone} to {ip_address}")
137→ domain = digitalocean.Domain(token=config.access_token, name=zone)
138→ try:
139→ record = domain.create_new_domain_record(type="A", ttl=60 * 5, name=host, data=ip_address, tag=TAG_MACHINE_CREATED)
140→ except digitalocean.NotFoundError:
141→ info(f"Warning: DNS zone '{zone}' not found in DigitalOcean, DNS record not set")
142→ record = None
143→ if record:
144→ if d.opt.verbose:
145→ info(f"Created DNS record:{record}")
146→ if not d.opt.quiet:
147→ info(f"DNS: {host}.{zone}")
148→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/destroy.py"
} 1→import click
2→import digitalocean
3→
4→from machine.di import d
5→from machine.log import debug, fatal_error, output
6→from machine.util import dnsRecordIdFromName, is_machine_created
7→from machine.types import MainCmdCtx
8→
9→from machine.util import is_same_session
10→
11→
12→@click.command(help="Destroy one or more machines")
13→@click.option("--confirm/--no-confirm", default=True)
14→@click.option("--delete-dns/--no-delete-dns", default=True)
15→@click.option(
16→ "--all",
17→ is_flag=True,
18→ default=False,
19→ help="Include machines not created by this tool",
20→)
21→@click.argument("droplet-ids", nargs=-1)
22→@click.pass_context
23→def command(context, confirm, delete_dns, all, droplet_ids):
24→ command_context: MainCmdCtx = context.obj
25→ config = command_context.config
26→ manager = digitalocean.Manager(token=config.access_token)
27→ for droplet_id in droplet_ids:
28→ try:
29→ droplet = manager.get_droplet(droplet_id)
30→ except digitalocean.NotFoundError:
31→ fatal_error(f"Error: machine with id {droplet_id} not found")
32→ name = droplet.name
33→
34→ if not is_machine_created(droplet) and not all:
35→ fatal_error(f'ERROR: Cannot destroy droplet "{name}" (id: {droplet.id}), it was not created by machine.')
36→
37→ if not is_same_session(command_context, droplet) and not all:
38→ fatal_error(
39→ f'ERROR: Cannot destroy droplet "{name}" (id: {droplet.id}), it was created by a different session of machine.'
40→ )
41→
42→ if confirm:
43→ output(
44→ "Type YES (not y or yes or Yes) to confirm that you want to permanently"
45→ f' DELETE/DESTROY droplet "{name}" (id: {droplet.id})'
46→ )
47→ confirmation = input()
48→ if confirmation != "YES":
49→ fatal_error("Destroy operation aborted, not confirmed by user")
50→ result = droplet.destroy()
51→
52→ if result and delete_dns and config.dns_zone:
53→ zone = config.dns_zone
54→ if d.opt.debug:
55→ debug(f"Deleting host record {name}.{zone}")
56→ domain = digitalocean.Domain(token=config.access_token, name=zone)
57→ if not domain:
58→ fatal_error(f"Error: Domain {domain} does not exist, machine destroyed but DNS record not removed")
59→ record_id = dnsRecordIdFromName(domain, name)
60→ if record_id:
61→ if d.opt.debug:
62→ debug(f"Deleting dns record id={record_id}")
63→ domain.delete_domain_record(id=record_id)
64→ else:
65→ if d.opt.debug:
66→ debug(f"No dns record found for {name}.{zone}")
67→
68→ if not result:
69→ fatal_error("Error destroying machine")
70→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/list.py"
} 1→import click
2→import json
3→import digitalocean
4→
5→from machine.log import fatal_error, output
6→from machine.types import MainCmdCtx, TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_SESSION_PREFIX
7→from machine.util import get_machine_type, is_machine_created, is_same_session, droplet_to_json_obj
8→
9→
10→def print_normal(droplets):
11→ for droplet in droplets:
12→ region = droplet.region["slug"] if droplet.region else "unknown"
13→ output(f"{droplet.name} ({droplet.id}, {region}, {get_machine_type(droplet)}): {droplet.ip_address}")
14→
15→
16→def print_quiet(droplets):
17→ for droplet in droplets:
18→ output(droplet.id)
19→
20→
21→def print_json(droplets):
22→ output(json.dumps([droplet_to_json_obj(d) for d in droplets]))
23→
24→
25→def get_droplets(command_context, id=None, name=None, tag=None, type=None, region=None, all=False):
26→ manager = digitalocean.Manager(token=command_context.config.access_token)
27→ droplets = []
28→ if id:
29→ droplet = manager.get_droplet(id)
30→ if droplet:
31→ droplets.append(droplet)
32→
33→ if all:
34→ droplets = manager.get_all_droplets()
35→ else:
36→ droplets = manager.get_all_droplets(tag_name=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)
37→
38→ # we can't combine most filters over the API, so we also filter ourselves
39→ if name:
40→ droplets = filter(lambda d: d.name == name, droplets)
41→
42→ if tag:
43→ droplets = filter(lambda d: tag in d.tags, droplets)
44→
45→ if type:
46→ droplets = filter(lambda d: TAG_MACHINE_TYPE_PREFIX + type.lower() in d.tags, droplets)
47→
48→ if region:
49→ droplets = filter(lambda d: d.region and region == d.region["slug"], droplets)
50→
51→ if not all:
52→ droplets = filter(lambda d: is_machine_created(d) and is_same_session(command_context, d), droplets)
53→
54→ return list(droplets)
55→
56→
57→@click.command(help="List machines")
58→@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
59→@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
60→@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
61→@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
62→@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
63→@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
64→@click.option(
65→ "--all",
66→ is_flag=True,
67→ default=False,
68→ help="All machines, including those not created by this tool or by other sessions",
69→)
70→@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
71→@click.option(
72→ "--unique",
73→ is_flag=True,
74→ default=False,
75→ help="Return an error if there is more than one match",
76→)
77→@click.pass_context
78→def command(context, id, name, tag, type, region, all, output, quiet, unique):
79→ command_context: MainCmdCtx = context.obj
80→
81→ droplets = get_droplets(command_context, id, name, tag, type, region, all)
82→ if unique and len(droplets) > 1:
83→ fatal_error(f"ERROR: --unique match required but {len(droplets)} matches found.")
84→
85→ if output == "json":
86→ print_json(droplets)
87→ elif quiet:
88→ print_quiet(droplets)
89→ else:
90→ print_normal(droplets)
91→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/status.py"
} 1→import click
2→import json
3→import requests
4→
5→from machine.log import output
6→from machine.subcommands.list import get_droplets
7→from machine.types import MainCmdCtx
8→
9→
10→def print_normal(statuses):
11→ for status in statuses:
12→ output(
13→ f"{status['name']} ({status['id']}):\t" + "\t".join([f"{k}={v}" for k, v in status.items() if k not in ["name", "id"]])
14→ )
15→
16→
17→def print_json(statuses):
18→ output(json.dumps(statuses))
19→
20→
21→@click.command(help="Machine status")
22→@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
23→@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
24→@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
25→@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
26→@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
27→@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
28→@click.option("--status-check", metavar="<CHECK>", default="cloud-init-status", help="Status check to perform")
29→@click.option(
30→ "--all",
31→ is_flag=True,
32→ default=False,
33→ help="All machines, including those not created by this tool or by other sessions",
34→)
35→@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
36→@click.pass_context
37→def command(context, id, name, tag, type, region, all, output, quiet, status_check):
38→ command_context: MainCmdCtx = context.obj
39→
40→ droplets = get_droplets(command_context, id, name, tag, type, region, all)
41→
42→ statuses = []
43→ for d in droplets:
44→ status = {"name": d.name, "id": d.id, "droplet-status": d.status, status_check: "UNKNOWN"}
45→ try:
46→ r = requests.get(f"http://{d.ip_address}:4242/cgi-bin/{status_check}")
47→ if 200 == r.status_code:
48→ status[status_check] = r.json()["status"]
49→ except: # noqa: E722
50→ pass
51→ statuses.append(status)
52→
53→ if output == "json":
54→ print_json(statuses)
55→ else:
56→ print_normal(statuses)
57→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/ssh_keys.py"
} 1→import click
2→import digitalocean
3→from machine.log import output
4→from machine.types import MainCmdCtx
5→
6→
7→@click.command(help="List ssh keys")
8→@click.pass_context
9→def command(context):
10→ command_context: MainCmdCtx = context.obj
11→ manager = digitalocean.Manager(token=command_context.config.access_token)
12→ my_keys = manager.get_all_sshkeys()
13→ for key in my_keys:
14→ output(f"{key.id}: {key.name} ({key.fingerprint})")
15→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/domains.py"
} 1→import click
2→import digitalocean
3→from machine.log import output
4→from machine.types import MainCmdCtx
5→
6→
7→@click.command(help="List dns domains")
8→@click.pass_context
9→def command(context):
10→ command_context: MainCmdCtx = context.obj
11→ manager = digitalocean.Manager(token=command_context.config.access_token)
12→ my_domains = manager.get_all_domains()
13→ for domain in my_domains:
14→ output(f"{domain.name}")
15→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/list_domain.py"
} 1→import click
2→import digitalocean
3→import json
4→
5→from machine.log import fatal_error, output
6→from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX
7→from machine.util import dns_record_to_json_obj
8→
9→
10→def print_normal(records, zone):
11→ for record in records:
12→ output(f"{record.name}\t{record.type}\t{record.data}")
13→
14→
15→def print_quiet(records):
16→ for record in records:
17→ output(record.name)
18→
19→
20→def print_json(records, droplets, zone):
21→ simplified = []
22→ for r in records:
23→ droplet = next((d for d in droplets if r.data == d.ip_address), None)
24→ simplified.append(dns_record_to_json_obj(r, zone, droplet))
25→ output(json.dumps(simplified))
26→
27→
28→@click.command(help="List domain records")
29→@click.option("--name", "-n", metavar="<RECORD-NAME>", help="Filter by name")
30→@click.option("--type", "-m", metavar="<RECORD-TYPE>", help="Filter by type (default A and AAAA)")
31→@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
32→@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
33→@click.option(
34→ "--all",
35→ is_flag=True,
36→ default=False,
37→ help="Include all records, even those not created by this tool or created by other sessions",
38→)
39→@click.argument("zone", required=False)
40→@click.pass_context
41→def command(context, name, type, output, quiet, all, zone):
42→ command_context: MainCmdCtx = context.obj
43→ if not zone:
44→ zone = command_context.config.dns_zone
45→ if not zone:
46→ fatal_error("Error: no DNS zone specified.")
47→ domain = digitalocean.Domain(token=command_context.config.access_token, name=zone)
48→ records = domain.get_records()
49→
50→ if type:
51→ if type != "*":
52→ records = filter(lambda r: r.type == type, records)
53→ else:
54→ records = filter(lambda r: r.type in ["A", "AAAA"], records)
55→
56→ manager = digitalocean.Manager(token=command_context.config.access_token)
57→ if all:
58→ droplets = manager.get_all_droplets()
59→ else:
60→ droplets = manager.get_all_droplets(tag_name=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)
61→ droplet_ips = [d.ip_address for d in droplets]
62→ records = filter(lambda r: r.data in droplet_ips, records)
63→
64→ records = list(records)
65→ if output == "json":
66→ print_json(records, droplets, zone)
67→ elif quiet:
68→ print_quiet(records)
69→ else:
70→ print_normal(records, zone)
71→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/projects.py"
} 1→import click
2→import digitalocean
3→from machine.log import output
4→from machine.types import MainCmdCtx
5→
6→
7→@click.command(help="List projects")
8→@click.pass_context
9→def command(context):
10→ command_context: MainCmdCtx = context.obj
11→ manager = digitalocean.Manager(token=command_context.config.access_token)
12→ my_projects = manager.get_all_projects()
13→ for project in my_projects:
14→ output(f"{project.name}")
15→{
"file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/types.py"
} 1→import click
2→
3→from machine.log import output
4→from machine.config import get_machines
5→
6→
7→@click.command(help="List projects")
8→@click.pass_context
9→def command(context):
10→ config_machines = get_machines()
11→ names = list(config_machines.keys())
12→ names.sort()
13→ for n in names:
14→ output(n)
15→ 1→[project]
2→name = "machine"
3→version = "1.0.0"
4→description = "Utility for creating and managing VMs"
5→readme = "README.md"
6→license = "AGPL-3.0-only"
7→requires-python = ">=3.8"
8→authors = [
9→ { name = "Stirlingbridge", email = "info@stirlingbridge.website" },
10→]
11→classifiers = [
12→ "Programming Language :: Python :: 3.8",
13→ "Operating System :: OS Independent",
14→]
15→dependencies = [
16→ "click==8.1.7",
17→ "python-digitalocean==1.17.0",
18→ "ruamel.yaml>=0.17.32",
19→ "requests==2.32.3",
20→ "expandvars==1.0.0",
21→]
22→
23→[project.scripts]
24→machine = "machine.main:main"
25→
26→[project.urls]
27→Homepage = "https://github.com/stirlingbridge/machine"
28→
29→[build-system]
30→requires = ["hatchling"]
31→build-backend = "hatchling.build"
32→
33→[tool.pytest.ini_options]
34→markers = [
35→ "e2e: end-to-end tests that create real resources on DigitalOcean (require E2E_* env vars)",
36→]
37→addopts = "-m 'not e2e'"
38→
39→[dependency-groups]
40→dev = [
41→ "ruff",
42→ "pytest",
43→]
44→
45→[tool.ruff]
46→line-length = 132
47→
48→[tool.ruff.lint]
49→select = ["E", "F", "W", "C90"]
50→ignore = ["E203"]
51→
52→[tool.ruff.lint.mccabe]
53→max-complexity = 25
54→
55→[tool.ruff.lint.per-file-ignores]
56→"machine/cloud_config.py" = ["E501"]
57→ 1→from pathlib import Path
2→
3→default_config_dir_path = str(Path.home()) + "/.machine"
4→default_config_file_path = default_config_dir_path + "/config.yml"
5→default_session_id_file_path = default_config_dir_path + "/session-id.yml"
6→ 1→import sys
2→
3→
4→def fatal_error(s: str):
5→ print(s, file=sys.stderr)
6→ sys.exit(1)
7→
8→
9→def debug(s: str):
10→ print(s, file=sys.stderr)
11→
12→
13→def info(s: str):
14→ print(s, file=sys.stderr)
15→
16→
17→def output(s: str):
18→ print(s)
19→/home/david/projects/stirlingbridge/machine/tests/test_input_validation.py /home/david/projects/stirlingbridge/machine/tests/test_cli_integration.py /home/david/projects/stirlingbridge/machine/tests/__init__.py /home/david/projects/stirlingbridge/machine/tests/test_config.py /home/david/projects/stirlingbridge/machine/tests/test_e2e.py
1→import os
2→import pytest
3→from unittest.mock import patch
4→from machine.config import _expand_env_vars
5→
6→
7→class TestExpandEnvVars:
8→ def test_plain_string_unchanged(self):
9→ assert _expand_env_vars("hello world") == "hello world"
10→
11→ def test_non_string_types_unchanged(self):
12→ assert _expand_env_vars(42) == 42
13→ assert _expand_env_vars(3.14) == 3.14
14→ assert _expand_env_vars(True) is True
15→ assert _expand_env_vars(None) is None
16→
17→ def test_simple_variable_substitution(self):
18→ with patch.dict(os.environ, {"MY_VAR": "my_value"}):
19→ assert _expand_env_vars("${MY_VAR}") == "my_value"
20→
21→ def test_variable_embedded_in_string(self):
22→ with patch.dict(os.environ, {"HOST": "example.com"}):
23→ assert _expand_env_vars("https://${HOST}/api") == "https://example.com/api"
24→
25→ def test_multiple_variables_in_string(self):
26→ with patch.dict(os.environ, {"HOST": "example.com", "PORT": "8080"}):
27→ assert _expand_env_vars("${HOST}:${PORT}") == "example.com:8080"
28→
29→ def test_default_value_when_var_unset(self):
30→ env = os.environ.copy()
31→ env.pop("UNSET_VAR", None)
32→ with patch.dict(os.environ, env, clear=True):
33→ assert _expand_env_vars("${UNSET_VAR:-fallback}") == "fallback"
34→
35→ def test_default_value_ignored_when_var_set(self):
36→ with patch.dict(os.environ, {"MY_VAR": "actual"}):
37→ assert _expand_env_vars("${MY_VAR:-fallback}") == "actual"
38→
39→ def test_default_value_empty_string(self):
40→ env = os.environ.copy()
41→ env.pop("UNSET_VAR", None)
42→ with patch.dict(os.environ, env, clear=True):
43→ assert _expand_env_vars("${UNSET_VAR:-}") == ""
44→
45→ def test_unset_variable_without_default_exits(self):
46→ env = os.environ.copy()
47→ env.pop("MISSING_VAR", None)
48→ with patch.dict(os.environ, env, clear=True):
49→ with pytest.raises(SystemExit):
50→ _expand_env_vars("${MISSING_VAR}")
51→
52→ def test_dict_values_expanded(self):
53→ with patch.dict(os.environ, {"TOKEN": "secret123"}):
54→ data = {"key": "${TOKEN}", "plain": "no-change"}
55→ result = _expand_env_vars(data)
56→ assert result == {"key": "secret123", "plain": "no-change"}
57→
58→ def test_nested_dict_expanded(self):
59→ with patch.dict(os.environ, {"VAL": "deep"}):
60→ data = {"outer": {"inner": "${VAL}"}}
61→ result = _expand_env_vars(data)
62→ assert result == {"outer": {"inner": "deep"}}
63→
64→ def test_list_values_expanded(self):
65→ with patch.dict(os.environ, {"A": "x", "B": "y"}):
66→ data = ["${A}", "literal", "${B}"]
67→ result = _expand_env_vars(data)
68→ assert result == ["x", "literal", "y"]
69→
70→ def test_mixed_nested_structure(self):
71→ with patch.dict(os.environ, {"V": "replaced"}):
72→ data = {"list": ["${V}", "fixed"], "nested": {"k": "${V}"}}
73→ result = _expand_env_vars(data)
74→ assert result == {"list": ["replaced", "fixed"], "nested": {"k": "replaced"}}
75→{
"file_path": "/home/david/projects/stirlingbridge/machine/tests/test_input_validation.py"
} 1→"""Tests for graceful handling of invalid user input."""
2→
3→import os
4→import subprocess
5→import textwrap
6→import pytest
7→
8→
9→def run_machine(*args, env_override=None):
10→ """Run the machine CLI as a subprocess and return the result."""
11→ env = os.environ.copy()
12→ if env_override:
13→ env.update(env_override)
14→ result = subprocess.run(
15→ ["uv", "run", "machine"] + list(args),
16→ capture_output=True,
17→ text=True,
18→ env=env,
19→ )
20→ return result
21→
22→
23→def write_config(path, content):
24→ with open(path, "w") as f:
25→ f.write(textwrap.dedent(content))
26→
27→
28→def valid_config():
29→ return """\
30→ digital-ocean:
31→ access-token: fake-token
32→ ssh-key: test-key
33→ machine-size: s-1vcpu-1gb
34→ image: ubuntu-22-04-x64
35→ region: nyc1
36→ project: test-project
37→ machines:
38→ web-server:
39→ new-user-name: admin
40→ db-server:
41→ new-user-name: dbadmin
42→ """
43→
44→
45→class TestInvalidMachineType:
46→ """Issue #29: invalid machine type should produce a graceful error, not a traceback."""
47→
48→ @pytest.fixture()
49→ def config_dir(self, tmp_path):
50→ return tmp_path
51→
52→ def test_invalid_machine_type_error_message(self, config_dir):
53→ config_file = config_dir / "config.yml"
54→ write_config(config_file, valid_config())
55→ result = run_machine(
56→ "--config-file",
57→ str(config_file),
58→ "create",
59→ "-n",
60→ "test-1",
61→ "-m",
62→ "nonexistent-type",
63→ "--no-update-dns",
64→ )
65→ assert result.returncode != 0
66→ assert "nonexistent-type" in result.stderr
67→ assert "not found" in result.stderr.lower()
68→ # Should list available types
69→ assert "web-server" in result.stderr
70→ assert "db-server" in result.stderr
71→ # Should NOT be a traceback
72→ assert "Traceback" not in result.stderr
73→ assert "KeyError" not in result.stderr
74→
75→ def test_valid_machine_type_accepted(self, config_dir):
76→ """Verify that a valid machine type doesn't produce an error about the type.
77→ (It will fail later trying to reach DigitalOcean, but not with a type error.)"""
78→ config_file = config_dir / "config.yml"
79→ write_config(config_file, valid_config())
80→ result = run_machine(
81→ "--config-file",
82→ str(config_file),
83→ "create",
84→ "-n",
85→ "test-1",
86→ "-m",
87→ "web-server",
88→ "--no-update-dns",
89→ )
90→ # It will fail (no real DO token), but NOT because of machine type
91→ assert "not found in config" not in result.stderr
92→
93→
94→class TestMissingConfigSections:
95→ """Missing required config sections/keys should produce graceful errors."""
96→
97→ @pytest.fixture()
98→ def config_dir(self, tmp_path):
99→ return tmp_path
100→
101→ def test_missing_digital_ocean_section(self, config_dir):
102→ config_file = config_dir / "config.yml"
103→ write_config(
104→ config_file,
105→ """\
106→ machines:
107→ web-server:
108→ new-user-name: admin
109→ """,
110→ )
111→ result = run_machine("--config-file", str(config_file), "types")
112→ assert result.returncode != 0
113→ assert "digital-ocean" in result.stderr
114→ assert "Traceback" not in result.stderr
115→
116→ def test_missing_access_token(self, config_dir):
117→ config_file = config_dir / "config.yml"
118→ write_config(
119→ config_file,
120→ """\
121→ digital-ocean:
122→ ssh-key: test-key
123→ machine-size: s-1vcpu-1gb
124→ image: ubuntu-22-04-x64
125→ region: nyc1
126→ project: test-project
127→ machines:
128→ web-server:
129→ new-user-name: admin
130→ """,
131→ )
132→ result = run_machine("--config-file", str(config_file), "types")
133→ assert result.returncode != 0
134→ assert "access-token" in result.stderr
135→ assert "Traceback" not in result.stderr
136→
137→ def test_missing_ssh_key(self, config_dir):
138→ config_file = config_dir / "config.yml"
139→ write_config(
140→ config_file,
141→ """\
142→ digital-ocean:
143→ access-token: fake-token
144→ machine-size: s-1vcpu-1gb
145→ image: ubuntu-22-04-x64
146→ region: nyc1
147→ project: test-project
148→ machines:
149→ web-server:
150→ new-user-name: admin
151→ """,
152→ )
153→ result = run_machine("--config-file", str(config_file), "types")
154→ assert result.returncode != 0
155→ assert "ssh-key" in result.stderr
156→ assert "Traceback" not in result.stderr
157→
158→ def test_missing_machines_section(self, config_dir):
159→ config_file = config_dir / "config.yml"
160→ write_config(
161→ config_file,
162→ """\
163→ digital-ocean:
164→ access-token: fake-token
165→ ssh-key: test-key
166→ machine-size: s-1vcpu-1gb
167→ image: ubuntu-22-04-x64
168→ region: nyc1
169→ project: test-project
170→ """,
171→ )
172→ result = run_machine("--config-file", str(config_file), "types")
173→ assert result.returncode != 0
174→ assert "machines" in result.stderr.lower()
175→ assert "Traceback" not in result.stderr
176→
177→ def test_missing_new_user_name_in_machine(self, config_dir):
178→ config_file = config_dir / "config.yml"
179→ write_config(
180→ config_file,
181→ """\
182→ digital-ocean:
183→ access-token: fake-token
184→ ssh-key: test-key
185→ machine-size: s-1vcpu-1gb
186→ image: ubuntu-22-04-x64
187→ region: nyc1
188→ project: test-project
189→ machines:
190→ broken-machine:
191→ script-url: http://example.com/setup.sh
192→ """,
193→ )
194→ result = run_machine(
195→ "--config-file",
196→ str(config_file),
197→ "create",
198→ "-n",
199→ "test-1",
200→ "-m",
201→ "broken-machine",
202→ "--no-update-dns",
203→ )
204→ assert result.returncode != 0
205→ assert "new-user-name" in result.stderr
206→ assert "Traceback" not in result.stderr
207→
208→
209→class TestCreateNoInitialize:
210→ """--no-initialize should work without a machine type and without crashing."""
211→
212→ @pytest.fixture()
213→ def config_dir(self, tmp_path):
214→ return tmp_path
215→
216→ def test_no_initialize_without_type_no_crash(self, config_dir):
217→ """Using --no-initialize without --type should not crash with AttributeError or NameError."""
218→ config_file = config_dir / "config.yml"
219→ write_config(config_file, valid_config())
220→ result = run_machine(
221→ "--config-file",
222→ str(config_file),
223→ "create",
224→ "-n",
225→ "test-1",
226→ "--no-initialize",
227→ "--no-update-dns",
228→ )
229→ # It will fail (no real DO token), but should NOT crash with AttributeError/NameError
230→ # from the bugs where user_data was undefined and type.lower() was called on None
231→ assert "AttributeError" not in result.stderr
232→ assert "NameError" not in result.stderr
233→{
"file_path": "/home/david/projects/stirlingbridge/machine/tests/test_cli_integration.py"
} 1→"""Integration tests that run the machine CLI as a subprocess, the same way a user would."""
2→
3→import os
4→import subprocess
5→import textwrap
6→import pytest
7→
8→
9→def run_machine(*args, env_override=None):
10→ """Run the machine CLI as a subprocess and return the result."""
11→ env = os.environ.copy()
12→ if env_override:
13→ env.update(env_override)
14→ result = subprocess.run(
15→ ["uv", "run", "machine"] + list(args),
16→ capture_output=True,
17→ text=True,
18→ env=env,
19→ )
20→ return result
21→
22→
23→def write_config(path, content):
24→ with open(path, "w") as f:
25→ f.write(textwrap.dedent(content))
26→
27→
28→class TestVersionCommand:
29→ """Smoke test: the CLI runs and the version command works without any config."""
30→
31→ def test_version_runs(self):
32→ result = run_machine("version")
33→ assert result.returncode == 0
34→ assert result.stdout.strip() # prints something
35→
36→ def test_help_runs(self):
37→ result = run_machine("--help")
38→ assert result.returncode == 0
39→ assert "Usage" in result.stdout
40→
41→
42→class TestEnvVarExpansionIntegration:
43→ """End-to-end tests that verify environment variable expansion works
44→ when the actual machine tool is invoked with a config file."""
45→
46→ @pytest.fixture()
47→ def config_dir(self, tmp_path):
48→ return tmp_path
49→
50→ def test_env_var_expanded_in_config(self, config_dir):
51→ config_file = config_dir / "config.yml"
52→ write_config(
53→ config_file,
54→ """\
55→ digital-ocean:
56→ access-token: "${TEST_DO_TOKEN}"
57→ ssh-key: test-key
58→ machine-size: s-1vcpu-1gb
59→ image: ubuntu-22-04-x64
60→ region: nyc1
61→ project: test-project
62→ machines:
63→ test-machine:
64→ new-user-name: testuser
65→ """,
66→ )
67→ result = run_machine(
68→ "--debug",
69→ "--config-file",
70→ str(config_file),
71→ "types",
72→ env_override={"TEST_DO_TOKEN": "tok_secret_12345"},
73→ )
74→ assert result.returncode == 0
75→ # The debug output on stderr should contain the expanded token value
76→ assert "tok_secret_12345" in result.stderr
77→ # And should NOT contain the unexpanded variable reference
78→ assert "${TEST_DO_TOKEN}" not in result.stderr
79→ # The types command should list the machine name on stdout
80→ assert "test-machine" in result.stdout
81→
82→ def test_multiple_env_vars_expanded(self, config_dir):
83→ config_file = config_dir / "config.yml"
84→ write_config(
85→ config_file,
86→ """\
87→ digital-ocean:
88→ access-token: "${TEST_TOKEN}"
89→ ssh-key: "${TEST_SSH_KEY}"
90→ machine-size: s-1vcpu-1gb
91→ image: ubuntu-22-04-x64
92→ region: nyc1
93→ project: test-project
94→ machines:
95→ dev:
96→ new-user-name: devuser
97→ """,
98→ )
99→ result = run_machine(
100→ "--debug",
101→ "--config-file",
102→ str(config_file),
103→ "types",
104→ env_override={"TEST_TOKEN": "expanded_token", "TEST_SSH_KEY": "expanded_key"},
105→ )
106→ assert result.returncode == 0
107→ assert "expanded_token" in result.stderr
108→ assert "expanded_key" in result.stderr
109→
110→ def test_env_var_with_default_uses_default_when_unset(self, config_dir):
111→ config_file = config_dir / "config.yml"
112→ write_config(
113→ config_file,
114→ """\
115→ digital-ocean:
116→ access-token: fake-token
117→ ssh-key: test-key
118→ machine-size: "${TEST_SIZE:-s-2vcpu-4gb}"
119→ image: ubuntu-22-04-x64
120→ region: nyc1
121→ project: test-project
122→ machines:
123→ myvm:
124→ new-user-name: admin
125→ """,
126→ )
127→ # Make sure TEST_SIZE is not in the environment
128→ clean_env = os.environ.copy()
129→ clean_env.pop("TEST_SIZE", None)
130→ result = run_machine(
131→ "--debug",
132→ "--config-file",
133→ str(config_file),
134→ "types",
135→ env_override={},
136→ )
137→ # Run with TEST_SIZE explicitly removed
138→ result = subprocess.run(
139→ ["uv", "run", "machine", "--debug", "--config-file", str(config_file), "types"],
140→ capture_output=True,
141→ text=True,
142→ env=clean_env,
143→ )
144→ assert result.returncode == 0
145→ assert "s-2vcpu-4gb" in result.stderr
146→
147→ def test_env_var_with_default_uses_value_when_set(self, config_dir):
148→ config_file = config_dir / "config.yml"
149→ write_config(
150→ config_file,
151→ """\
152→ digital-ocean:
153→ access-token: fake-token
154→ ssh-key: test-key
155→ machine-size: "${TEST_SIZE:-s-2vcpu-4gb}"
156→ image: ubuntu-22-04-x64
157→ region: nyc1
158→ project: test-project
159→ machines:
160→ myvm:
161→ new-user-name: admin
162→ """,
163→ )
164→ result = run_machine(
165→ "--debug",
166→ "--config-file",
167→ str(config_file),
168→ "types",
169→ env_override={"TEST_SIZE": "s-4vcpu-8gb"},
170→ )
171→ assert result.returncode == 0
172→ assert "s-4vcpu-8gb" in result.stderr
173→ assert "s-2vcpu-4gb" not in result.stderr
174→
175→ def test_missing_env_var_without_default_exits_with_error(self, config_dir):
176→ config_file = config_dir / "config.yml"
177→ write_config(
178→ config_file,
179→ """\
180→ digital-ocean:
181→ access-token: "${DEFINITELY_NOT_SET_VAR}"
182→ ssh-key: test-key
183→ machine-size: s-1vcpu-1gb
184→ image: ubuntu-22-04-x64
185→ region: nyc1
186→ project: test-project
187→ machines:
188→ myvm:
189→ new-user-name: admin
190→ """,
191→ )
192→ clean_env = os.environ.copy()
193→ clean_env.pop("DEFINITELY_NOT_SET_VAR", None)
194→ result = subprocess.run(
195→ ["uv", "run", "machine", "--config-file", str(config_file), "types"],
196→ capture_output=True,
197→ text=True,
198→ env=clean_env,
199→ )
200→ assert result.returncode != 0
201→ assert "DEFINITELY_NOT_SET_VAR" in result.stderr
202→
203→ def test_env_var_in_machine_config_section(self, config_dir):
204→ config_file = config_dir / "config.yml"
205→ write_config(
206→ config_file,
207→ """\
208→ digital-ocean:
209→ access-token: fake-token
210→ ssh-key: test-key
211→ machine-size: s-1vcpu-1gb
212→ image: ubuntu-22-04-x64
213→ region: nyc1
214→ project: test-project
215→ machines:
216→ web-server:
217→ new-user-name: "${TEST_USERNAME}"
218→ db-server:
219→ new-user-name: admin
220→ """,
221→ )
222→ result = run_machine(
223→ "--debug",
224→ "--config-file",
225→ str(config_file),
226→ "types",
227→ env_override={"TEST_USERNAME": "deploy_user"},
228→ )
229→ assert result.returncode == 0
230→ assert "deploy_user" in result.stderr
231→ # Both machine types should be listed
232→ assert "db-server" in result.stdout
233→ assert "web-server" in result.stdout
234→ 1→"""End-to-end tests that create real VMs on DigitalOcean.
2→
3→These tests require a real DigitalOcean environment and are NOT run as part of
4→the normal test suite. They must be invoked explicitly:
5→
6→ uv run pytest tests/test_e2e.py -v
7→
8→Required environment variables:
9→ E2E_DO_TOKEN - DigitalOcean API token
10→ E2E_SSH_KEY - Name of an SSH key already registered in DO
11→ E2E_DNS_ZONE - DNS zone managed by DO (e.g. "test.example.com")
12→ E2E_PROJECT - DO project name to assign droplets to
13→
14→Optional environment variables:
15→ E2E_REGION - Region slug (default: nyc1)
16→ E2E_IMAGE - Image slug (default: ubuntu-24-04-x64)
17→ E2E_SIZE - Machine size slug (default: s-1vcpu-512mb-10gb)
18→"""
19→
20→import json
21→import os
22→import subprocess
23→import uuid
24→
25→import pytest
26→
27→
28→# ---------------------------------------------------------------------------
29→# Skip the entire module if credentials are not provided
30→# ---------------------------------------------------------------------------
31→
32→E2E_DO_TOKEN = os.environ.get("E2E_DO_TOKEN")
33→E2E_SSH_KEY = os.environ.get("E2E_SSH_KEY")
34→E2E_DNS_ZONE = os.environ.get("E2E_DNS_ZONE")
35→E2E_REGION = os.environ.get("E2E_REGION", "nyc1")
36→E2E_IMAGE = os.environ.get("E2E_IMAGE", "ubuntu-24-04-x64")
37→E2E_SIZE = os.environ.get("E2E_SIZE", "s-1vcpu-512mb-10gb")
38→E2E_PROJECT = os.environ.get("E2E_PROJECT")
39→
40→pytestmark = pytest.mark.e2e
41→
42→_MISSING = []
43→if not E2E_DO_TOKEN:
44→ _MISSING.append("E2E_DO_TOKEN")
45→if not E2E_SSH_KEY:
46→ _MISSING.append("E2E_SSH_KEY")
47→if not E2E_DNS_ZONE:
48→ _MISSING.append("E2E_DNS_ZONE")
49→if not E2E_PROJECT:
50→ _MISSING.append("E2E_PROJECT")
51→
52→if _MISSING:
53→ pytestmark = [
54→ pytest.mark.e2e,
55→ pytest.mark.skip(reason=f"E2E env vars not set: {', '.join(_MISSING)}"),
56→ ]
57→
58→
59→# ---------------------------------------------------------------------------
60→# Helpers
61→# ---------------------------------------------------------------------------
62→
63→
64→def _unique_name(prefix="e2etest"):
65→ """Generate a short unique droplet name safe for DNS."""
66→ return f"{prefix}-{uuid.uuid4().hex[:8]}"
67→
68→
69→def _write_config(path, **overrides):
70→ """Write a minimal config file for the e2e test run."""
71→ cfg = {
72→ "access-token": E2E_DO_TOKEN,
73→ "ssh-key": E2E_SSH_KEY,
74→ "dns-zone": E2E_DNS_ZONE,
75→ "machine-size": E2E_SIZE,
76→ "image": E2E_IMAGE,
77→ "region": E2E_REGION,
78→ }
79→ cfg["project"] = E2E_PROJECT
80→ cfg.update(overrides)
81→
82→ do_lines = "\n".join(f" {k}: {v}" for k, v in cfg.items())
83→ content = f"digital-ocean:\n{do_lines}\nmachines:\n e2e-basic:\n new-user-name: e2euser\n"
84→ with open(path, "w") as f:
85→ f.write(content)
86→
87→
88→def run_machine(*args, config_file=None, session_id=None):
89→ """Run the machine CLI as a subprocess with the given arguments."""
90→ cmd = ["uv", "run", "machine"]
91→ if config_file:
92→ cmd += ["--config-file", str(config_file)]
93→ if session_id:
94→ cmd += ["--session-id", session_id]
95→ cmd += list(args)
96→ result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
97→ return result
98→
99→
100→def _extract_droplet_id(output_text):
101→ """Extract the droplet ID from CLI output like 'New droplet created with id: 12345'."""
102→ for line in output_text.splitlines():
103→ if "id:" in line.lower():
104→ parts = line.split("id:")
105→ if len(parts) >= 2:
106→ candidate = parts[-1].strip()
107→ if candidate.isdigit():
108→ return candidate
109→ return None
110→
111→
112→# ---------------------------------------------------------------------------
113→# Fixtures
114→# ---------------------------------------------------------------------------
115→
116→
117→@pytest.fixture(scope="module")
118→def config_file(tmp_path_factory):
119→ """Write a config file that lives for the whole test module."""
120→ path = tmp_path_factory.mktemp("e2e") / "config.yml"
121→ _write_config(path)
122→ return path
123→
124→
125→@pytest.fixture(scope="module")
126→def session_id():
127→ """A unique session id shared across all tests in this module."""
128→ return uuid.uuid4().hex[:8]
129→
130→
131→@pytest.fixture(scope="class")
132→def droplet(config_file, session_id):
133→ """Create a single droplet with all features and destroy it after all tests.
134→
135→ The droplet is created with DNS, a machine type (cloud-init), a custom tag,
136→ and --wait-for-ip so that all aspects can be verified by individual tests.
137→ """
138→ name = _unique_name()
139→ custom_tag = f"e2e-tag-{uuid.uuid4().hex[:6]}"
140→
141→ # ---- CREATE with all features ------------------------------------------
142→ result = run_machine(
143→ "create",
144→ "--name",
145→ name,
146→ "--type",
147→ "e2e-basic",
148→ "--update-dns",
149→ "--tag",
150→ custom_tag,
151→ "--wait-for-ip",
152→ config_file=config_file,
153→ session_id=session_id,
154→ )
155→ assert result.returncode == 0, f"create failed: {result.stderr}"
156→ create_out = result.stdout + result.stderr
157→ droplet_id = _extract_droplet_id(create_out)
158→ assert droplet_id, f"Could not find droplet id in output:\n{create_out}"
159→
160→ info = {
161→ "name": name,
162→ "id": droplet_id,
163→ "custom_tag": custom_tag,
164→ "create_out": create_out,
165→ }
166→
167→ yield info
168→
169→ # ---- TEARDOWN: destroy with DNS cleanup --------------------------------
170→ run_machine(
171→ "--verbose",
172→ "destroy",
173→ "--no-confirm",
174→ "--delete-dns",
175→ droplet_id,
176→ config_file=config_file,
177→ session_id=session_id,
178→ )
179→
180→
181→# ---------------------------------------------------------------------------
182→# Tests — one droplet, many assertions
183→# ---------------------------------------------------------------------------
184→
185→
186→class TestDropletLifecycle:
187→ """Create one droplet with all features and verify each aspect independently.
188→
189→ A single droplet is created (via the class-scoped ``droplet`` fixture) with
190→ DNS, a machine type, and a custom tag. Each test method verifies a different
191→ aspect so that failures are reported individually. The droplet is destroyed
192→ automatically after all tests complete.
193→ """
194→
195→ def test_droplet_appears_in_list(self, droplet, config_file, session_id):
196→ """Verify the droplet shows up in ``list`` with the correct name."""
197→ result = run_machine(
198→ "list",
199→ "--output",
200→ "json",
201→ config_file=config_file,
202→ session_id=session_id,
203→ )
204→ assert result.returncode == 0, f"list failed: {result.stderr}"
205→ droplets = json.loads(result.stdout)
206→ matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
207→ assert len(matched) == 1, f"Expected 1 droplet with id {droplet['id']}, got {len(matched)}"
208→ assert matched[0]["name"] == droplet["name"]
209→
210→ def test_droplet_has_ip(self, droplet, config_file, session_id):
211→ """Verify the droplet was assigned an IP address."""
212→ result = run_machine(
213→ "list",
214→ "--output",
215→ "json",
216→ config_file=config_file,
217→ session_id=session_id,
218→ )
219→ assert result.returncode == 0
220→ droplets = json.loads(result.stdout)
221→ matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
222→ assert len(matched) == 1
223→ assert matched[0]["ip"] is not None, "Droplet has no IP address"
224→
225→ def test_dns_record_created(self, droplet, config_file, session_id):
226→ """Verify that a DNS A record was created for the droplet."""
227→ result = run_machine(
228→ "list-domain",
229→ "--name",
230→ droplet["name"],
231→ "--output",
232→ "json",
233→ E2E_DNS_ZONE,
234→ config_file=config_file,
235→ session_id=session_id,
236→ )
237→ assert result.returncode == 0, f"list-domain failed: {result.stderr}"
238→ records = json.loads(result.stdout)
239→ a_records = [r for r in records if r.get("name") == droplet["name"] and r.get("type") == "A"]
240→ assert len(a_records) >= 1, f"No A record found for {droplet['name']}.{E2E_DNS_ZONE}"
241→
242→ def test_dns_zone_in_create_output(self, droplet):
243→ """Verify that DNS zone was mentioned in the create output."""
244→ assert E2E_DNS_ZONE in droplet["create_out"], f"DNS zone not mentioned in output:\n{droplet['create_out']}"
245→
246→ def test_type_tag_applied(self, droplet, config_file, session_id):
247→ """Verify that the machine type tag was applied and is filterable."""
248→ result = run_machine(
249→ "list",
250→ "--type",
251→ "e2e-basic",
252→ "--output",
253→ "json",
254→ config_file=config_file,
255→ session_id=session_id,
256→ )
257→ assert result.returncode == 0
258→ droplets = json.loads(result.stdout)
259→ matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
260→ assert len(matched) == 1, "Droplet not found when filtering by type e2e-basic"
261→ assert matched[0]["type"] == "e2e-basic", "Type tag mismatch"
262→
263→ def test_custom_tag_applied(self, droplet, config_file, session_id):
264→ """Verify that the custom tag was applied and is filterable."""
265→ result = run_machine(
266→ "list",
267→ "--tag",
268→ droplet["custom_tag"],
269→ "--output",
270→ "json",
271→ config_file=config_file,
272→ session_id=session_id,
273→ )
274→ assert result.returncode == 0
275→ droplets = json.loads(result.stdout)
276→ matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
277→ assert len(matched) == 1, f"Droplet not found with tag {droplet['custom_tag']}"
278→Now I have a complete picture of the codebase. I'll implement the provider abstraction layer, refactor the existing DigitalOcean code into a provider implementation, and update all subcommands. Let me start.
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class VM:
id: str
name: str
tags: list
region: str
ip_address: str
status: str
@dataclass
class SSHKey:
id: str
name: str
fingerprint: str
public_key: str
@dataclass
class DNSRecord:
id: str
name: str
type: str
data: str
ttl: int
class CloudProvider(ABC):
@abstractmethod
def create_vm(self, name, region, image, size, ssh_key_name, tags, user_data) -> VM:
pass
@abstractmethod
def get_vm(self, vm_id) -> VM:
pass
@abstractmethod
def destroy_vm(self, vm_id) -> bool:
pass
@abstractmethod
def list_vms(self, tag=None) -> list:
pass
@abstractmethod
def get_ssh_key(self, name) -> SSHKey:
pass
@abstractmethod
def list_ssh_keys(self) -> list:
pass
@abstractmethod
def create_dns_record(self, zone, record_type, name, data, ttl, tag=None) -> str:
pass
@abstractmethod
def delete_dns_record(self, zone, record_name) -> bool:
pass
@abstractmethod
def get_dns_records(self, zone) -> list:
pass
@abstractmethod
def list_domains(self) -> list:
pass
def list_projects(self) -> list:
return []
def assign_to_project(self, project_name, vm_id):
pass
def validate_region(self, region):
pass
def validate_image(self, image):
pass
@property
def provider_name(self) -> str:
return self.__class__.__name__
File created successfully at: /home/david/projects/stirlingbridge/machine/machine/provider.py
mkdir -p /home/david/projects/stirlingbridge/machine/machine/providers
from machine.log import fatal_error
from machine.provider import CloudProvider
KNOWN_PROVIDERS = ["digital-ocean"]
def create_provider(provider_name, provider_config) -> CloudProvider:
if provider_name == "digital-ocean":
from machine.providers.digitalocean import DigitalOceanProvider
return DigitalOceanProvider(provider_config)
else:
fatal_error(f"Unknown provider: '{provider_name}'. Known providers: {', '.join(KNOWN_PROVIDERS)}")
File created successfully at: /home/david/projects/stirlingbridge/machine/machine/providers/__init__.py
import digitalocean
from machine.log import fatal_error, info
from machine.provider import CloudProvider, VM, SSHKey, DNSRecord
VALID_REGIONS = ["NYC1", "NYC3", "AMS3", "SFO2", "SFO3", "SGP1", "LON1", "FRA1", "TOR1", "BLR1", "SYD1"]
VALID_IMAGES = [
"almalinux-8-x64",
"almalinux-9-x64",
"centos-stream-9-x64",
"debian-11-x64",
"debian-12-x64",
"fedora-39-x64",
"fedora-40-x64",
"rockylinux-9-x64",
"rockylinux-8-x64",
"ubuntu-20-04-x64",
"ubuntu-22-04-x64",
"ubuntu-24-04-x64",
]
def _droplet_to_vm(droplet) -> VM:
return VM(
id=str(droplet.id),
name=droplet.name,
tags=droplet.tags,
region=droplet.region["slug"] if droplet.region else None,
ip_address=droplet.ip_address,
status=droplet.status,
)
class DigitalOceanProvider(CloudProvider):
def __init__(self, provider_config):
self.token = provider_config["access-token"]
self._manager = digitalocean.Manager(token=self.token)
def create_vm(self, name, region, image, size, ssh_key_name, tags, user_data) -> VM:
ssh_key = self._get_do_ssh_key(ssh_key_name)
if not ssh_key:
fatal_error(f"Error: SSH key '{ssh_key_name}' not found in DigitalOcean")
droplet = digitalocean.Droplet(
token=self.token,
name=name,
region=region,
image=image,
size_slug=size,
ssh_keys=[ssh_key],
tags=tags,
user_data=user_data,
backups=False,
)
droplet.create()
return _droplet_to_vm(droplet)
def get_vm(self, vm_id) -> VM:
droplet = self._manager.get_droplet(vm_id)
return _droplet_to_vm(droplet)
def destroy_vm(self, vm_id) -> bool:
try:
droplet = self._manager.get_droplet(vm_id)
except digitalocean.NotFoundError:
fatal_error(f"Error: machine with id {vm_id} not found")
result = droplet.destroy()
return result
def list_vms(self, tag=None) -> list:
if tag:
droplets = self._manager.get_all_droplets(tag_name=tag)
else:
droplets = self._manager.get_all_droplets()
return [_droplet_to_vm(d) for d in droplets]
def get_ssh_key(self, name) -> SSHKey:
do_key = self._get_do_ssh_key(name)
if not do_key:
return None
return SSHKey(
id=str(do_key.id),
name=do_key.name,
fingerprint=do_key.fingerprint,
public_key=do_key.public_key,
)
def list_ssh_keys(self) -> list:
keys = self._manager.get_all_sshkeys()
return [
SSHKey(id=str(k.id), name=k.name, fingerprint=k.fingerprint, public_key=k.public_key)
for k in keys
]
def create_dns_record(self, zone, record_type, name, data, ttl, tag=None) -> str:
domain = digitalocean.Domain(token=self.token, name=zone)
try:
record = domain.create_new_domain_record(type=record_type, ttl=ttl, name=name, data=data, tag=tag)
except digitalocean.NotFoundError:
info(f"Warning: DNS zone '{zone}' not found in DigitalOcean, DNS record not set")
return None
return record
def delete_dns_record(self, zone, record_name) -> bool:
domain = digitalocean.Domain(token=self.token, name=zone)
records = domain.get_records()
for record in records:
if record.name == record_name:
domain.delete_domain_record(id=record.id)
return True
return False
def get_dns_records(self, zone) -> list:
domain = digitalocean.Domain(token=self.token, name=zone)
records = domain.get_records()
return [
DNSRecord(id=str(r.id), name=r.name, type=r.type, data=r.data, ttl=r.ttl)
for r in records
]
def list_domains(self) -> list:
domains = self._manager.get_all_domains()
return [d.name for d in domains]
def list_projects(self) -> list:
projects = self._manager.get_all_projects()
return [p.name for p in projects]
def assign_to_project(self, project_name, vm_id):
projects = self._manager.get_all_projects()
project = None
for p in projects:
if p.name == project_name:
project = p
break
if not project:
fatal_error(f"Error: Project {project_name} does not exist, machine created but not assigned to project")
project.assign_resource([f"do:droplet:{vm_id}"])
def validate_region(self, region):
if region is not None and region.upper() not in VALID_REGIONS:
fatal_error(f"Error: region {region} is not one of {VALID_REGIONS}")
def validate_image(self, image):
if image is not None and image not in VALID_IMAGES:
info(f"Warning: image {image} is not one of these known valid images: {VALID_IMAGES}")
def _get_do_ssh_key(self, name):
keys = self._manager.get_all_sshkeys()
for key in keys:
if key.name == name:
return key
return None
@property
def provider_name(self) -> str:
return "DigitalOcean"
File created successfully at: /home/david/projects/stirlingbridge/machine/machine/providers/digitalocean.py
from dataclasses import dataclass
TAG_MACHINE_CREATED = "machine:created"
TAG_MACHINE_TYPE_PREFIX = "machine:type:"
TAG_MACHINE_SESSION_PREFIX = "machine:session:"
@dataclass
class CliOptions:
debug: bool
quiet: bool
verbose: bool
dry_run: bool
@dataclass
class Config:
provider_name: str
provider_config: dict
ssh_key: str
dns_zone: str
machine_size: str
image: str
region: str
project: str
@dataclass
class MainCmdCtx:
config: Config
session_id: str
provider: object # CloudProvider instance
@dataclass
class MachineConfig:
new_user_name: str
script_url: str
script_dir: str
script_path: str
script_args: str
The file /home/david/projects/stirlingbridge/machine/machine/types.py has been updated successfully.
import os
import re
from pathlib import Path
from machine.di import d
from machine.factory import yaml
from machine.log import fatal_error, debug
from machine.types import Config, MachineConfig
from machine import constants
from machine.providers import KNOWN_PROVIDERS
_env_var_pattern = re.compile(r"\$\{([^}]+)\}")
def _expand_env_vars(value):
if isinstance(value, str):
def _replace(match):
expr = match.group(1)
if ":-" in expr:
var_name, default = expr.split(":-", 1)
return os.environ.get(var_name, default)
else:
if expr not in os.environ:
fatal_error(f"Environment variable '{expr}' referenced in config is not set")
return os.environ[expr]
return _env_var_pattern.sub(_replace, value)
elif isinstance(value, dict):
return {k: _expand_env_vars(v) for k, v in value.items()}
elif isinstance(value, list):
return [_expand_env_vars(item) for item in value]
return value
class _loaded_config:
c: any = None
def _load_config_data(config_file_name: str):
if not config_file_name:
config_file_name = constants.default_config_file_path
config_path = Path(os.path.expanduser(config_file_name))
if not config_path.exists():
fatal_error(f"Error: Config file: {config_path} not found")
config = _expand_env_vars(yaml().load(open(config_path, "r")))
if d.opt.debug:
debug(f"Loaded config file: {config_path}")
debug(f"Parsed config: {config}")
_loaded_config.c = config
return config
def _require_key(d, key, section_name):
if key not in d:
fatal_error(f"Required key '{key}' not found in '{section_name}' section of config file")
return d[key]
def get(config_file_name: str) -> Config:
config = _load_config_data(config_file_name)
# Auto-detect provider from config sections
provider_name = config.get("provider")
if not provider_name:
found = [p for p in KNOWN_PROVIDERS if p in config]
if len(found) == 0:
fatal_error(
"No provider section found in config file. Expected one of: " + ", ".join(KNOWN_PROVIDERS)
)
if len(found) > 1:
fatal_error(
"Multiple provider sections found in config file. Please add a 'provider:' key to select one."
)
provider_name = found[0]
if provider_name not in config:
fatal_error(f"Provider '{provider_name}' specified but no '{provider_name}' section found in config file")
provider_config = config[provider_name]
return Config(
provider_name=provider_name,
provider_config=provider_config,
ssh_key=_require_key(provider_config, "ssh-key", provider_name),
dns_zone=provider_config.get("dns-zone"),
machine_size=_require_key(provider_config, "machine-size", provider_name),
image=_require_key(provider_config, "image", provider_name),
region=_require_key(provider_config, "region", provider_name),
project=provider_config.get("project"),
)
def get_machine(name: str) -> MachineConfig:
if not _loaded_config.c:
fatal_error("Attempt to fetch machine data before config loaded")
config = _loaded_config.c
if "machines" not in config:
fatal_error("Required 'machines' section not found in config file")
config_machines = config["machines"]
if name not in config_machines:
fatal_error(f"Machine type '{name}' not found in config file. Available types: {', '.join(config_machines.keys())}")
target_config = config_machines[name]
return MachineConfig(
_require_key(target_config, "new-user-name", f"machines.{name}"),
target_config.get("script-url"),
target_config.get("script-dir"),
target_config.get("script-path"),
target_config.get("script-args"),
)
def get_machines():
if not _loaded_config.c:
fatal_error("Attempt to fetch machine data before config loaded")
config = _loaded_config.c
if "machines" not in config:
fatal_error("Required 'machines' section not found in config file")
ret = {}
for name in config["machines"]:
ret[name] = get_machine(name)
return ret
The file /home/david/projects/stirlingbridge/machine/machine/config.py has been updated successfully.
import os
import click
from machine import config
from machine import constants
from machine.di import d
from machine.log import output
from machine.providers import create_provider
from machine.subcommands import create, destroy, list, projects, ssh_keys, domains, list_domain, types, status
from machine.types import CliOptions, MainCmdCtx
from machine.util import load_session_id
CLICK_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.group(context_settings=CLICK_CONTEXT_SETTINGS)
@click.option("--debug", is_flag=True, default=False, help="Enable debug output")
@click.option("--quiet", is_flag=True, default=False, help="Suppress all non-essential output")
@click.option("--verbose", is_flag=True, default=False, help="Enable verbose output")
@click.option("--dry-run", is_flag=True, default=False, help="Run but do not do anything")
@click.option("--config-file", metavar="<PATH>", help=f"Specify the config file (default {constants.default_config_file_path})")
@click.option(
"--session-id", metavar="<ID>", default=load_session_id(), help=f"Override the default session ID (default {load_session_id()})"
)
@click.pass_context
def main(context, debug, quiet, verbose, dry_run, config_file, session_id):
options = CliOptions(debug, quiet, verbose, dry_run)
d.opt = options
# Skip config loading for version subcommand since it doesn't need it
# and should work even when no config file exists (#25)
if context.invoked_subcommand == "version":
return
cfg = config.get(config_file)
provider = create_provider(cfg.provider_name, cfg.provider_config)
main_context = MainCmdCtx(cfg, session_id, provider)
context.obj = main_context
@main.command()
@click.pass_context
def version(context):
try:
version_file = os.path.join(os.path.dirname(__file__), "version.txt")
with open(version_file) as f:
version_string = f.read().strip()
except FileNotFoundError:
version_string = "dev"
output(version_string)
main.add_command(create.command, "create")
main.add_command(destroy.command, "destroy")
main.add_command(domains.command, "domains")
main.add_command(list.command, "list")
main.add_command(list_domain.command, "list-domain")
main.add_command(projects.command, "projects")
main.add_command(ssh_keys.command, "ssh-keys")
main.add_command(types.command, "types")
main.add_command(status.command, "status")
The file /home/david/projects/stirlingbridge/machine/machine/main.py has been updated successfully.
import os
import uuid
from machine.factory import yaml
from machine.constants import default_config_dir_path, default_session_id_file_path
from machine.types import TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_CREATED
from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX
def get_machine_type(vm):
type = next((t for t in vm.tags if TAG_MACHINE_TYPE_PREFIX in t), "").replace(TAG_MACHINE_TYPE_PREFIX, "")
if not type:
return None
return type
def is_machine_created(vm):
return TAG_MACHINE_CREATED in vm.tags
def is_same_session(command_context: MainCmdCtx, vm):
return TAG_MACHINE_SESSION_PREFIX + command_context.session_id in vm.tags
def load_session_id():
if not os.path.exists(default_config_dir_path):
os.mkdir(default_config_dir_path)
if not os.path.exists(default_session_id_file_path):
with open(default_session_id_file_path, "w") as f:
f.write("id: " + str(uuid.uuid4()).replace("-", "")[0:8])
sessionid_config = yaml().load(open(default_session_id_file_path, "r"))
return sessionid_config["id"]
def vm_to_json_obj(vm):
return {
"id": vm.id,
"name": vm.name,
"tags": vm.tags,
"region": vm.region,
"ip": vm.ip_address,
"type": get_machine_type(vm),
}
def dns_record_to_json_obj(dns_record, zone, vm):
if vm:
vm = vm_to_json_obj(vm)
return {
"id": dns_record.id,
"droplet": vm,
"name": dns_record.name,
"fqdn": f"{dns_record.name}.{zone}",
"zone": zone,
"data": dns_record.data,
"ttl": dns_record.ttl,
"type": dns_record.type,
}
The file /home/david/projects/stirlingbridge/machine/machine/util.py has been updated successfully.
from expandvars import expand
from machine.log import fatal_error
from machine.provider import CloudProvider
from machine.types import MachineConfig
def get_user_data(provider: CloudProvider, ssh_key_name: str, fqdn: str, machine_config: MachineConfig):
if not fqdn:
fqdn = ""
script_args = machine_config.script_args
if not script_args:
script_args = ""
ssh_key = provider.get_ssh_key(ssh_key_name)
if not ssh_key:
fatal_error(f"Error: SSH key '{ssh_key_name}' not found in {provider.provider_name}")
ssh_public_key = ssh_key.public_key
escaped_args = script_args.replace('"', '\\"')
cloud_env = {
"MACHINE_SCRIPT_URL": machine_config.script_url,
"MACHINE_SCRIPT_DIR": machine_config.script_dir,
"MACHINE_FQDN": fqdn,
}
# Exand here because otherwise escaping the vars properly for nested scripts is a guessing game
escaped_args = expand(escaped_args, environ=cloud_env)
cloud_config = f"""#cloud-config
users:
- name: {machine_config.new_user_name}
groups: sudo
shell: /bin/bash
sudo: ['ALL=(ALL) NOPASSWD:ALL']
ssh-authorized-keys:
- {ssh_public_key}
"""
if machine_config.script_url and machine_config.script_dir and machine_config.script_path:
cloud_config += f"""
runcmd:
- mkdir -p {machine_config.script_dir}
- curl -L {machine_config.script_url} -o {machine_config.script_path}
- chmod +x {machine_config.script_path}
- [su, -c, "env {" ".join([f"{k}='{v}'" for k, v in cloud_env.items()])} {machine_config.script_path} {escaped_args}", -, {machine_config.new_user_name}]
"""
return cloud_config
The file /home/david/projects/stirlingbridge/machine/machine/cloud_config.py has been updated successfully.
import click
import time
from machine.config import get_machine
from machine.di import d
from machine.log import fatal_error, info, debug, output
from machine.types import MainCmdCtx, TAG_MACHINE_CREATED, TAG_MACHINE_TYPE_PREFIX
from machine.cloud_config import get_user_data
from machine.types import TAG_MACHINE_SESSION_PREFIX
@click.command(help="Create a machine")
@click.option("--name", "-n", required=True, metavar="<MACHINE-NAME>", help="Name for new machine")
@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="tag to be applied to new machine")
@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="create a machine of this type")
@click.option("--region", "-r", metavar="<REGION-CODE>", help="create a machine in this region (overrides default from config)")
@click.option(
"--machine-size", "-s", metavar="<MACHINE-SLUG>", help="create a machine of this size (overrides default from config)"
)
@click.option("--image", "-s", metavar="<IMAGE-NAME>", help="create a machine from this image (overrides default from config)")
@click.option("--wait-for-ip/--no-wait-for-up", default=False)
@click.option("--update-dns/--no-update-dns", default=True)
@click.option("--initialize/--no-initialize", default=True)
@click.pass_context
def command(context, name, tag, type, region, machine_size, image, wait_for_ip, update_dns, initialize):
command_context: MainCmdCtx = context.obj
config = command_context.config
provider = command_context.provider
if update_dns and not config.dns_zone:
fatal_error("Error: DNS update requested but no zone configured")
user_data = None
if initialize:
if not type:
fatal_error("Error: a machine type must be supplied")
machine_config = get_machine(type)
if not machine_config:
fatal_error(f"Error: machine type {type} is not defined")
fqdn = f"{name}.{config.dns_zone}" if config.dns_zone else None
user_data = get_user_data(provider, config.ssh_key, fqdn, machine_config)
if d.opt.debug:
info("user-data is:")
info(user_data)
# Verify SSH key exists
ssh_key = provider.get_ssh_key(config.ssh_key)
if not ssh_key:
fatal_error(f"Error: SSH key '{config.ssh_key}' not found in {provider.provider_name}")
provider.validate_region(region)
provider.validate_image(image)
tags = [
TAG_MACHINE_SESSION_PREFIX + command_context.session_id,
TAG_MACHINE_CREATED,
]
if type:
tags.append(TAG_MACHINE_TYPE_PREFIX + type.lower())
if tag:
tags.append(tag)
vm = provider.create_vm(
name=name,
region=region if region is not None else config.region,
image=image if image is not None else config.image,
size=machine_size if machine_size is not None else config.machine_size,
ssh_key_name=config.ssh_key,
tags=tags,
user_data=user_data,
)
if vm.id:
if d.opt.quiet:
output(f"{vm.id}")
else:
output(f"New droplet created with id: {vm.id}")
# If requested, assign to a specified project
if config.project:
provider.assign_to_project(config.project, vm.id)
if d.opt.verbose:
info(f"Assigned droplet to project: {config.project}")
# If requested, or if we are going to set a DNS record get the VM's IPv4 address
ip_address = vm.ip_address
if (wait_for_ip or update_dns) and not ip_address:
while not ip_address:
time.sleep(1)
vm = provider.get_vm(vm.id)
ip_address = vm.ip_address
if d.opt.verbose:
output("Waiting for droplet IP address")
if d.opt.quiet:
info(f"{ip_address}")
else:
info(f"IP Address: {ip_address}")
# If requested, and we have the IP address, create a DNS host record
if update_dns and ip_address and config.dns_zone:
zone = config.dns_zone
host = name
if d.opt.debug:
debug(f"Setting host record {host}.{zone} to {ip_address}")
record = provider.create_dns_record(
zone=zone,
record_type="A",
name=host,
data=ip_address,
ttl=60 * 5,
tag=TAG_MACHINE_CREATED,
)
if record:
if d.opt.verbose:
info(f"Created DNS record:{record}")
if not d.opt.quiet:
info(f"DNS: {host}.{zone}")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/create.py has been updated successfully.
import click
from machine.di import d
from machine.log import debug, fatal_error, output
from machine.util import is_machine_created
from machine.types import MainCmdCtx
from machine.util import is_same_session
@click.command(help="Destroy one or more machines")
@click.option("--confirm/--no-confirm", default=True)
@click.option("--delete-dns/--no-delete-dns", default=True)
@click.option(
"--all",
is_flag=True,
default=False,
help="Include machines not created by this tool",
)
@click.argument("droplet-ids", nargs=-1)
@click.pass_context
def command(context, confirm, delete_dns, all, droplet_ids):
command_context: MainCmdCtx = context.obj
config = command_context.config
provider = command_context.provider
for droplet_id in droplet_ids:
vm = provider.get_vm(droplet_id)
name = vm.name
if not is_machine_created(vm) and not all:
fatal_error(f'ERROR: Cannot destroy droplet "{name}" (id: {vm.id}), it was not created by machine.')
if not is_same_session(command_context, vm) and not all:
fatal_error(
f'ERROR: Cannot destroy droplet "{name}" (id: {vm.id}), it was created by a different session of machine.'
)
if confirm:
output(
"Type YES (not y or yes or Yes) to confirm that you want to permanently"
f' DELETE/DESTROY droplet "{name}" (id: {vm.id})'
)
confirmation = input()
if confirmation != "YES":
fatal_error("Destroy operation aborted, not confirmed by user")
result = provider.destroy_vm(droplet_id)
if result and delete_dns and config.dns_zone:
zone = config.dns_zone
if d.opt.debug:
debug(f"Deleting host record {name}.{zone}")
deleted = provider.delete_dns_record(zone, name)
if deleted:
if d.opt.debug:
debug(f"Deleted dns record for {name}.{zone}")
else:
if d.opt.debug:
debug(f"No dns record found for {name}.{zone}")
if not result:
fatal_error("Error destroying machine")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/destroy.py has been updated successfully.
import click
import json
from machine.log import fatal_error, output
from machine.types import MainCmdCtx, TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_SESSION_PREFIX
from machine.util import get_machine_type, is_machine_created, is_same_session, vm_to_json_obj
def print_normal(vms):
for vm in vms:
region = vm.region if vm.region else "unknown"
output(f"{vm.name} ({vm.id}, {region}, {get_machine_type(vm)}): {vm.ip_address}")
def print_quiet(vms):
for vm in vms:
output(vm.id)
def print_json(vms):
output(json.dumps([vm_to_json_obj(v) for v in vms]))
def get_vms(command_context, id=None, name=None, tag=None, type=None, region=None, all=False):
provider = command_context.provider
vms = []
if id:
vm = provider.get_vm(id)
if vm:
vms.append(vm)
if all:
vms = provider.list_vms()
else:
vms = provider.list_vms(tag=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)
# we can't combine most filters over the API, so we also filter ourselves
if name:
vms = filter(lambda v: v.name == name, vms)
if tag:
vms = filter(lambda v: tag in v.tags, vms)
if type:
vms = filter(lambda v: TAG_MACHINE_TYPE_PREFIX + type.lower() in v.tags, vms)
if region:
vms = filter(lambda v: v.region and region == v.region, vms)
if not all:
vms = filter(lambda v: is_machine_created(v) and is_same_session(command_context, v), vms)
return list(vms)
@click.command(help="List machines")
@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
@click.option(
"--all",
is_flag=True,
default=False,
help="All machines, including those not created by this tool or by other sessions",
)
@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
@click.option(
"--unique",
is_flag=True,
default=False,
help="Return an error if there is more than one match",
)
@click.pass_context
def command(context, id, name, tag, type, region, all, output, quiet, unique):
command_context: MainCmdCtx = context.obj
vms = get_vms(command_context, id, name, tag, type, region, all)
if unique and len(vms) > 1:
fatal_error(f"ERROR: --unique match required but {len(vms)} matches found.")
if output == "json":
print_json(vms)
elif quiet:
print_quiet(vms)
else:
print_normal(vms)
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/list.py has been updated successfully.
import click
import json
import requests
from machine.log import output
from machine.subcommands.list import get_vms
from machine.types import MainCmdCtx
def print_normal(statuses):
for status in statuses:
output(
f"{status['name']} ({status['id']}):\t" + "\t".join([f"{k}={v}" for k, v in status.items() if k not in ["name", "id"]])
)
def print_json(statuses):
output(json.dumps(statuses))
@click.command(help="Machine status")
@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
@click.option("--status-check", metavar="<CHECK>", default="cloud-init-status", help="Status check to perform")
@click.option(
"--all",
is_flag=True,
default=False,
help="All machines, including those not created by this tool or by other sessions",
)
@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
@click.pass_context
def command(context, id, name, tag, type, region, all, output, quiet, status_check):
command_context: MainCmdCtx = context.obj
vms = get_vms(command_context, id, name, tag, type, region, all)
statuses = []
for vm in vms:
status = {"name": vm.name, "id": vm.id, "droplet-status": vm.status, status_check: "UNKNOWN"}
try:
r = requests.get(f"http://{vm.ip_address}:4242/cgi-bin/{status_check}")
if 200 == r.status_code:
status[status_check] = r.json()["status"]
except: # noqa: E722
pass
statuses.append(status)
if output == "json":
print_json(statuses)
else:
print_normal(statuses)
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/status.py has been updated successfully.
import click
from machine.log import output
from machine.types import MainCmdCtx
@click.command(help="List ssh keys")
@click.pass_context
def command(context):
command_context: MainCmdCtx = context.obj
provider = command_context.provider
keys = provider.list_ssh_keys()
for key in keys:
output(f"{key.id}: {key.name} ({key.fingerprint})")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/ssh_keys.py has been updated successfully.
import click
from machine.log import output
from machine.types import MainCmdCtx
@click.command(help="List dns domains")
@click.pass_context
def command(context):
command_context: MainCmdCtx = context.obj
provider = command_context.provider
domains = provider.list_domains()
for domain in domains:
output(f"{domain}")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/domains.py has been updated successfully.
import click
import json
from machine.log import fatal_error, output
from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX
from machine.util import dns_record_to_json_obj
def print_normal(records, zone):
for record in records:
output(f"{record.name}\t{record.type}\t{record.data}")
def print_quiet(records):
for record in records:
output(record.name)
def print_json(records, vms, zone):
simplified = []
for r in records:
vm = next((v for v in vms if r.data == v.ip_address), None)
simplified.append(dns_record_to_json_obj(r, zone, vm))
output(json.dumps(simplified))
@click.command(help="List domain records")
@click.option("--name", "-n", metavar="<RECORD-NAME>", help="Filter by name")
@click.option("--type", "-m", metavar="<RECORD-TYPE>", help="Filter by type (default A and AAAA)")
@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
@click.option(
"--all",
is_flag=True,
default=False,
help="Include all records, even those not created by this tool or created by other sessions",
)
@click.argument("zone", required=False)
@click.pass_context
def command(context, name, type, output, quiet, all, zone):
command_context: MainCmdCtx = context.obj
provider = command_context.provider
if not zone:
zone = command_context.config.dns_zone
if not zone:
fatal_error("Error: no DNS zone specified.")
records = provider.get_dns_records(zone)
if type:
if type != "*":
records = filter(lambda r: r.type == type, records)
else:
records = filter(lambda r: r.type in ["A", "AAAA"], records)
if all:
vms = provider.list_vms()
else:
vms = provider.list_vms(tag=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)
vm_ips = [v.ip_address for v in vms]
records = filter(lambda r: r.data in vm_ips, records)
records = list(records)
if output == "json":
print_json(records, vms, zone)
elif quiet:
print_quiet(records)
else:
print_normal(records, zone)
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/list_domain.py has been updated successfully.
import click
from machine.log import output
from machine.types import MainCmdCtx
@click.command(help="List projects")
@click.pass_context
def command(context):
command_context: MainCmdCtx = context.obj
provider = command_context.provider
projects = provider.list_projects()
for project in projects:
output(f"{project}")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/projects.py has been updated successfully.
Building machine @ file:///home/david/projects/stirlingbridge/machine
Built machine @ file:///home/david/projects/stirlingbridge/machine
Uninstalled 1 package in 2ms
Installed 1 package in 1ms
All checks passed!Exit code 4 ERROR: usage: pytest [options] [file_or_dir] [file_or_dir] [...] pytest: error: unrecognized arguments: --timeout=30 inifile: /home/david/projects/stirlingbridge/machine/pyproject.toml rootdir: /home/david/projects/stirlingbridge/machine
Exit code 1
============================= test session starts ==============================
platform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/david/projects/stirlingbridge/machine/.venv/bin/python
cachedir: .pytest_cache
rootdir: /home/david/projects/stirlingbridge/machine
configfile: pyproject.toml
collecting ... collected 35 items / 6 deselected / 29 selected
tests/test_cli_integration.py::TestVersionCommand::test_version_runs PASSED [ 3%]
tests/test_cli_integration.py::TestVersionCommand::test_help_runs PASSED [ 6%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_expanded_in_config PASSED [ 10%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_multiple_env_vars_expanded PASSED [ 13%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_default_when_unset PASSED [ 17%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_value_when_set PASSED [ 20%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_missing_env_var_without_default_exits_with_error PASSED [ 24%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_in_machine_config_section PASSED [ 27%]
tests/test_config.py::TestExpandEnvVars::test_plain_string_unchanged PASSED [ 31%]
tests/test_config.py::TestExpandEnvVars::test_non_string_types_unchanged PASSED [ 34%]
tests/test_config.py::TestExpandEnvVars::test_simple_variable_substitution PASSED [ 37%]
tests/test_config.py::TestExpandEnvVars::test_variable_embedded_in_string PASSED [ 41%]
tests/test_config.py::TestExpandEnvVars::test_multiple_variables_in_string PASSED [ 44%]
tests/test_config.py::TestExpandEnvVars::test_default_value_when_var_unset PASSED [ 48%]
tests/test_config.py::TestExpandEnvVars::test_default_value_ignored_when_var_set PASSED [ 51%]
tests/test_config.py::TestExpandEnvVars::test_default_value_empty_string PASSED [ 55%]
tests/test_config.py::TestExpandEnvVars::test_unset_variable_without_default_exits PASSED [ 58%]
tests/test_config.py::TestExpandEnvVars::test_dict_values_expanded PASSED [ 62%]
tests/test_config.py::TestExpandEnvVars::test_nested_dict_expanded PASSED [ 65%]
tests/test_config.py::TestExpandEnvVars::test_list_values_expanded PASSED [ 68%]
tests/test_config.py::TestExpandEnvVars::test_mixed_nested_structure PASSED [ 72%]
tests/test_input_validation.py::TestInvalidMachineType::test_invalid_machine_type_error_message PASSED [ 75%]
tests/test_input_validation.py::TestInvalidMachineType::test_valid_machine_type_accepted PASSED [ 79%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_digital_ocean_section PASSED [ 82%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_access_token FAILED [ 86%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_ssh_key PASSED [ 89%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_machines_section PASSED [ 93%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_new_user_name_in_machine PASSED [ 96%]
tests/test_input_validation.py::TestCreateNoInitialize::test_no_initialize_without_type_no_crash PASSED [100%]
=================================== FAILURES ===================================
_____________ TestMissingConfigSections.test_missing_access_token ______________
self = <tests.test_input_validation.TestMissingConfigSections object at 0x7e94bc9dd430>
config_dir = PosixPath('/tmp/pytest-of-david/pytest-11/test_missing_access_token0')
def test_missing_access_token(self, config_dir):
config_file = config_dir / "config.yml"
write_config(
config_file,
"""\
digital-ocean:
ssh-key: test-key
machine-size: s-1vcpu-1gb
image: ubuntu-22-04-x64
region: nyc1
project: test-project
machines:
web-server:
new-user-name: admin
""",
)
result = run_machine("--config-file", str(config_file), "types")
assert result.returncode != 0
assert "access-token" in result.stderr
> assert "Traceback" not in result.stderr
E assert 'Traceback' not in "Traceback (...ess-token'\n"
E
E 'Traceback' is contained here:
E Traceback (most recent call last):
E ? +++++++++
E File "/home/david/projects/stirlingbridge/machine/.venv/bin/machine", line 10, in <module>
E sys.exit(main())
E ^^^^^^...
E
E ...Full output truncated (27 lines hidden), use '-vv' to show
tests/test_input_validation.py:135: AssertionError
=========================== short test summary info ============================
FAILED tests/test_input_validation.py::TestMissingConfigSections::test_missing_access_token
================== 1 failed, 28 passed, 6 deselected in 4.45s ==================The failure is because access-token validation now happens in the provider constructor (which crashes with a KeyError). I need to add graceful validation there.
class DigitalOceanProvider(CloudProvider):
def __init__(self, provider_config):
self.token = provider_config["access-token"]
self._manager = digitalocean.Manager(token=self.token)class DigitalOceanProvider(CloudProvider):
def __init__(self, provider_config):
if "access-token" not in provider_config:
fatal_error("Required key 'access-token' not found in 'digital-ocean' section of config file")
self.token = provider_config["access-token"]
self._manager = digitalocean.Manager(token=self.token)The file /home/david/projects/stirlingbridge/machine/machine/providers/digitalocean.py has been updated successfully.
============================= test session starts ============================== platform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/david/projects/stirlingbridge/machine/.venv/bin/python cachedir: .pytest_cache rootdir: /home/david/projects/stirlingbridge/machine configfile: pyproject.toml collecting ... collected 35 items / 6 deselected / 29 selected tests/test_cli_integration.py::TestVersionCommand::test_version_runs PASSED [ 3%] tests/test_cli_integration.py::TestVersionCommand::test_help_runs PASSED [ 6%] tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_expanded_in_config PASSED [ 10%] tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_multiple_env_vars_expanded PASSED [ 13%] tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_default_when_unset PASSED [ 17%] tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_value_when_set PASSED [ 20%] tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_missing_env_var_without_default_exits_with_error PASSED [ 24%] tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_in_machine_config_section PASSED [ 27%] tests/test_config.py::TestExpandEnvVars::test_plain_string_unchanged PASSED [ 31%] tests/test_config.py::TestExpandEnvVars::test_non_string_types_unchanged PASSED [ 34%] tests/test_config.py::TestExpandEnvVars::test_simple_variable_substitution PASSED [ 37%] tests/test_config.py::TestExpandEnvVars::test_variable_embedded_in_string PASSED [ 41%] tests/test_config.py::TestExpandEnvVars::test_multiple_variables_in_string PASSED [ 44%] tests/test_config.py::TestExpandEnvVars::test_default_value_when_var_unset PASSED [ 48%] tests/test_config.py::TestExpandEnvVars::test_default_value_ignored_when_var_set PASSED [ 51%] tests/test_config.py::TestExpandEnvVars::test_default_value_empty_string PASSED [ 55%] tests/test_config.py::TestExpandEnvVars::test_unset_variable_without_default_exits PASSED [ 58%] tests/test_config.py::TestExpandEnvVars::test_dict_values_expanded PASSED [ 62%] tests/test_config.py::TestExpandEnvVars::test_nested_dict_expanded PASSED [ 65%] tests/test_config.py::TestExpandEnvVars::test_list_values_expanded PASSED [ 68%] tests/test_config.py::TestExpandEnvVars::test_mixed_nested_structure PASSED [ 72%] tests/test_input_validation.py::TestInvalidMachineType::test_invalid_machine_type_error_message PASSED [ 75%] tests/test_input_validation.py::TestInvalidMachineType::test_valid_machine_type_accepted PASSED [ 79%] tests/test_input_validation.py::TestMissingConfigSections::test_missing_digital_ocean_section PASSED [ 82%] tests/test_input_validation.py::TestMissingConfigSections::test_missing_access_token PASSED [ 86%] tests/test_input_validation.py::TestMissingConfigSections::test_missing_ssh_key PASSED [ 89%] tests/test_input_validation.py::TestMissingConfigSections::test_missing_machines_section PASSED [ 93%] tests/test_input_validation.py::TestMissingConfigSections::test_missing_new_user_name_in_machine PASSED [ 96%] tests/test_input_validation.py::TestCreateNoInitialize::test_no_initialize_without_type_no_crash PASSED [100%] ======================= 29 passed, 6 deselected in 3.85s =======================
All 29 tests pass, lint is clean. Here's a summary of what was done:
machine/provider.py — Abstract base class CloudProvider with provider-neutral data types (VM, SSHKey, DNSRecord). Defines the interface all providers must implement.machine/providers/__init__.py — Provider factory (create_provider()) that instantiates the right provider based on config. Maintains KNOWN_PROVIDERS list.machine/providers/digitalocean.py — DigitalOceanProvider implementation wrapping all python-digitalocean API calls. All DO-specific logic (region/image validation, project assignment, resource URNs) lives here.machine/types.py — Config now has provider_name and provider_config instead of access_token. MainCmdCtx now carries a provider instance.machine/config.py — Auto-detects provider from config file sections (looks for known provider names like digital-ocean). No longer hardcoded to DO.machine/main.py — Creates the provider via factory and passes it through the Click context.machine/util.py — Removed all digitalocean imports. Helper functions now work with provider-neutral VM and DNSRecord types.machine/cloud_config.py — Takes CloudProvider instead of DO Manager.provider from context instead of directly calling digitalocean.*.digital-ocean: configs work unchanged with auto-detection.access-token).list_projects() and assign_to_project() have default no-op implementations in the ABC, since not all providers support projects.validate_region()/validate_image() — Default no-ops in ABC; DO provider overrides with its known-valid lists.To add Vultr (or any other provider), you'd now just need to:
1. Create machine/providers/vultr.py implementing CloudProvider
2. Add "vultr" to KNOWN_PROVIDERS in machine/providers/__init__.py
3. Add the elif branch in create_provider()
Excellent thanks. Could you please create a PR for this work. I've already made the branch and pushed it to remote. Add the notes you made above to the PR description please.
git branch --show-current && git log --oneline -5 && git remote -v
dboreham/provider-abstraction b53dbb4 Abstract provider interface to allow future multi-provider support ba07efa Merge pull request #44 from stirlingbridge/dboreham/migrate-lint-ruff 6bd58c9 Migrate to modern linting tools 2672c00 Merge pull request #42 from stirlingbridge/dboreham/speed-up-e2e-tests ff437f5 Refector e2e tests to only create one vm origin git@github.com:stirlingbridge/machine.git (fetch) origin git@github.com:stirlingbridge/machine.git (push)
gh pr create --title "Add provider abstraction layer for multi-cloud support" --body "$(cat <<'EOF' ## Summary Introduces a `CloudProvider` abstraction layer, decoupling all subcommands from DigitalOcean's API. This is the foundation for adding Vultr (and other providers) per #43. - New `CloudProvider` ABC with provider-neutral data types (`VM`, `SSHKey`, `DNSRecord`) - `DigitalOceanProvider` implementation wrapping all `python-digitalocean` API calls - Provider auto-detection from config file sections (existing `digital-ocean:` configs work unchanged) - All 8 subcommands refactored to use the provider interface instead of calling `digitalocean.*` directly ### New files - **`machine/provider.py`** — Abstract base class defining the interface all providers must implement - **`machine/providers/__init__.py`** — Provider factory and `KNOWN_PROVIDERS` registry - **`machine/providers/digitalocean.py`** — `DigitalOceanProvider` with all DO-specific logic (region/image validation, project assignment, resource URNs) ### Modified files - **`machine/types.py`** — `Config` now has `provider_name` and `provider_config` instead of `access_token`; `MainCmdCtx` carries a `provider` instance - **`machine/config.py`** — Auto-detects provider from config sections; no longer hardcoded to DO - **`machine/main.py`** — Creates provider via factory and passes it through Click context - **`machine/util.py`** — Removed all `digitalocean` imports; helpers work with neutral types - **`machine/cloud_config.py`** — Takes `CloudProvider` instead of DO `Manager` - **All 8 subcommands** — Use `provider` from context instead of `digitalocean.*` ### Design decisions - **Backward compatible** — Existing `digital-ocean:` configs work unchanged with auto-detection - **Provider validates itself** — Each provider checks its own required config keys (e.g., DO validates `access-token`) - **Optional capabilities** — `list_projects()` and `assign_to_project()` have default no-op implementations in the ABC, since not all providers support projects - **`validate_region()`/`validate_image()`** — Default no-ops in ABC; DO provider overrides with its known-valid lists ### Adding a new provider To add Vultr (or any other provider), you'd now just need to: 1. Create `machine/providers/vultr.py` implementing `CloudProvider` 2. Add `"vultr"` to `KNOWN_PROVIDERS` in `machine/providers/__init__.py` 3. Add the `elif` branch in `create_provider()` Closes #43 ## Test plan - [x] All 29 existing unit and integration tests pass - [x] Linter clean (`ruff check`) - [ ] E2E tests against real DigitalOcean (requires credentials) 🤖 Generated with [Claude Code](https://claude.com/claude-code) EOF )"
Thanks. There was a CI failure on the e2e tests. I will re-start this session with the credentials required to run that test suite.
Sounds good. I'll be ready to debug the e2e test failure when you're back with the credentials.