Claude Code transcript - page 1/3

Please take a look at issue #43.

$ Bash
View GitHub issue #43
gh issue view 43
title:	Investigate adding support for a second cloud platform
state:	OPEN
author:	dboreham
labels:	
comments:	0
assignees:	
projects:	
milestone:	
number:	43
--
Currently `machine` only supports DigitalOcean as a hosting provider, although the tool was written with multi-provider use cases in mind.

Investigate what it would take to add support for a second provider. The most promising second provider is: Vultr since they appear
to have "feature parity" with DigitalOcean, a similar pricing structure, and an API.
Agent
Explore codebase for provider abstraction
{
  "subagent_type": "Explore",
  "prompt": "Explore this codebase thoroughly to understand how the DigitalOcean provider is integrated and what abstractions exist (or don't) for supporting multiple cloud providers. Look at:\n\n1. The main entry point and how DigitalOcean is used\n2. factory.py - how VMs are created\n3. di.py - dependency injection / globals\n4. config.py - configuration loading\n5. cloud_config.py - cloud-init config\n6. All subcommands in machine/subcommands/\n7. Any existing provider abstraction or interface\n\nI want to understand:\n- How tightly coupled is the code to DigitalOcean's API?\n- What abstractions already exist that could support multiple providers?\n- What would need to change to add Vultr support?\n- How is the config structured?\n\nBe very thorough - read all relevant files."
}
Excellent! Now let me create a comprehensive summary document. Based on my thorough exploration, I have a complete understanding of the codebase. Let me organize all findings:

## Comprehensive Analysis: DigitalOcean Integration and Provider Abstraction in `machine` CLI

### 1. TIGHT COUPLING ASSESSMENT

The code is **deeply and directly coupled to DigitalOcean's API** through the `python-digitalocean` library (version 1.17.0). There is **no existing abstraction layer** for multiple cloud providers. Here's the extent of coupling:

#### Import Statements (Direct Dependencies)
- `digitalocean.Manager` - used in 9 subcommands (create, destroy, list, list-domain, ssh_keys, domains, projects, status)
- `digitalocean.Droplet` - VM object model in create.py
- `digitalocean.Domain` - DNS management in create.py and destroy.py
- `digitalocean.Project` - Project assignment in create.py
- `digitalocean.SSHKey` - SSH key retrieval in create.py
- `digitalocean.NotFoundError` - Exception handling in create.py and destroy.py

#### Hardcoded DigitalOcean Specifics
**In `/home/david/projects/stirlingbridge/machine/machine/subcommands/create.py` (lines 14-36):**
- Hardcoded valid regions list: `["NYC1", "NYC3", "AMS3", "SFO2", "SFO3", "SGP1", "LON1", "FRA1", "TOR1", "BLR1", "SYD1"]`
- Hardcoded valid images list: `["almalinux-8-x64", "almalinux-9-x64", "centos-stream-9-x64", ...]`
- These are DigitalOcean-specific image slugs and regions

**In `/home/david/projects/stirlingbridge/machine/machine/config.py` (lines 60-71):**
```python
if "digital-ocean" not in config:
    fatal_error("Required 'digital-ocean' section not found in config file")
config_do = config["digital-ocean"]
return Config(
    _require_key(config_do, "access-token", "digital-ocean"),
    _require_key(config_do, "ssh-key", "digital-ocean"),
    ...
)
```
Configuration is hardcoded to expect `digital-ocean` section only. No provider selection mechanism.

**Error Messages:**
- cloud_config.py line 18: "SSH key '...' not found in **DigitalOcean**" (explicitly names provider)
- create.py line 76: "SSH key '...' not found in **DigitalOcean**"

#### DigitalOcean-Specific Logic
**In `/home/david/projects/stirlingbridge/machine/machine/util.py`:**
- `projectFromName()` - queries DigitalOcean projects via `manager.get_all_projects()`
- `sshKeyFromName()` - queries DigitalOcean SSH keys via `manager.get_all_sshkeys()`
- `dnsRecordIdFromName()` - queries DigitalOcean domain records
- Helpers work directly with DO object attributes (e.g., `droplet.region["slug"]`, `droplet.ip_address`, `domain.get_records()`)

**DNS Implementation:**
- Creates A records specifically (line 139 in create.py: `type="A"`)
- Uses DigitalOcean Domain API exclusively
- 5-minute TTL hardcoded (line 139: `ttl=60 * 5`)

**Droplet/VM Model:**
- `digitalocean.Droplet` construction has DO-specific parameters:
  - `region` - expects DO region slug
  - `image` - expects DO image slug
  - `size_slug` - expects DO size slug
  - `ssh_keys` - expects DO SSHKey objects
  - `tags` - DigitalOcean feature for resource organization
  - `backups=False` - DigitalOcean-specific feature

---

### 2. EXISTING ABSTRACTIONS (OR LACK THEREOF)

#### What EXISTS (Limited):
1. **Cloud-init Config Generation** (`cloud_config.py`):
   - `get_user_data()` generates cloud-init scripts
   - **Provider-agnostic** - cloud-init is a standard and works across all cloud providers
   - Generates YAML that can be used by any provider supporting cloud-init
   - **This is the one truly portable component**

2. **Session/Tagging System** (`types.py`, `util.py`):
   - `TAG_MACHINE_CREATED`, `TAG_MACHINE_SESSION_PREFIX`, `TAG_MACHINE_TYPE_PREFIX`
   - Uses tags to identify machines
   - **Problem**: Tags are stored as a list, relying on DigitalOcean's tagging feature
   - Would need to be rethought for providers without native tagging

3. **Data Structures** (`types.py`):
   ```python
   @dataclass
   class Config:
       access_token: str      # DO-specific
       ssh_key: str           # DO-specific
       dns_zone: str
       machine_size: str      # DO size_slug format
       image: str             # DO image slug
       region: str            # DO region slug
       project: str           # DO-specific (project assignment)
   
   @dataclass
   class MachineConfig:
       new_user_name: str
       script_url: str
       script_dir: str
       script_path: str
       script_args: str
   ```
   The `Config` class bundles DO-specific fields with generic ones.

#### What DOES NOT EXIST:
1. **No Provider Interface/Abstract Base Class** - No abstraction like:
   ```python
   class CloudProvider(ABC):
       @abstractmethod
       def create_vm(self, params): pass
       @abstractmethod
       def destroy_vm(self, vm_id): pass
       @abstractmethod
       def list_vms(self): pass
   ```

2. **No Provider Factory** - Hard decision between provider happens at config load time, not through a factory pattern

3. **No Capability Matrix** - No mapping of:
   - Which regions each provider supports
   - Which images each provider offers
   - Which machine sizes each provider has
   - Whether provider supports native tagging, DNS, projects, etc.

4. **No Adapter Pattern** - Commands directly use `digitalocean.*` classes without wrapping them in provider-agnostic adapters

---

### 3. CONFIGURATION STRUCTURE

**Location:** `~/.machine/config.yml`

**Current Structure:**
```yaml
digital-ocean:
    access-token: dop_v1_...                    # Secrets
    ssh-key: my-ssh-key-name                    # Name in DO account
    dns-zone: example.com                       # Optional
    machine-size: s-4vcpu-8gb                   # DO-specific size slug
    image: ubuntu-22-04-x64                     # DO-specific image slug
    region: nyc3                                # DO-specific region
    project: Infrastructure                     # DO project name (optional)

machines:
    example:                                    # Machine type
        new-user-name: alice                    # User for cloud-init
        script-dir: /opt/setup-scripts
        script-url: https://...
        script-path: /opt/setup-scripts/setup.sh
        script-args: "-y"
```

**Key Characteristics:**
- Provider name hardcoded in config section (`digital-ocean`)
- All top-level config values are provider-specific
- Config loading in `config.py` has no abstraction - directly maps to `Config` dataclass
- Environment variable substitution uses `${VAR}` syntax and is implemented at parse time

**Processing:**
```python
# config.py line 38-49
config = _load_config_data(config_file_name)    # Loads YAML
config = _expand_env_vars(config)               # Expands ${VAR} in place
# Then directly accesses config["digital-ocean"]
```

---

### 4. COMMAND ARCHITECTURE

Each subcommand follows a similar pattern (exemplified by `create.py`):

```python
@click.command()
@click.option(...)
@click.pass_context
def command(context, ...):
    command_context: MainCmdCtx = context.obj  # From main.py
    config = command_context.config            # Type: Config
    
    # Direct use of python-digitalocean:
    manager = digitalocean.Manager(token=config.access_token)
    
    # Droplet manipulation:
    droplet = digitalocean.Droplet(
        token=config.access_token,
        name=name,
        region=config.region,
        image=config.image,
        size_slug=config.machine_size,
        ...
    )
    droplet.create()
```

**Subcommands and their DigitalOcean usage:**

| Subcommand | DO Classes Used | Key Operations |
|---|---|---|
| `create` | Manager, Droplet, Domain, Project, SSHKey | Create VM, DNS record, project assignment |
| `destroy` | Manager, Droplet, Domain | Destroy VM, delete DNS record |
| `list` | Manager (get_all_droplets) | Query VMs, tag-based filtering |
| `status` | Droplet (via list), HTTP (custom) | Query VM status + custom endpoint |
| `ssh-keys` | Manager | List SSH keys |
| `domains` | Manager | List DNS domains |
| `list-domain` | Domain, Manager | List DNS records, correlate with VMs |
| `projects` | Manager | List projects |
| `types` | None (only config) | List machine types from config |

---

### 5. DEPENDENCY INJECTION

**Location:** `/home/david/projects/stirlingbridge/machine/machine/di.py`

Very minimal - just a holder for CLI options:
```python
class d:
    opt: CliOptions = None
```

This is set in `main.py` line 28:
```python
d.opt = options
```

Used by subcommands to check debug, quiet, verbose, dry_run flags. No abstraction layer injected here.

---

### 6. KEY FILES AND THEIR ROLES

| File | Purpose | DigitalOcean Coupling |
|---|---|---|
| `main.py` | Click group, config loading, subcommand registration | Minimal - just orchestration |
| `config.py` | YAML parsing, env var expansion, validation | **HIGH** - hardcoded "digital-ocean" section |
| `di.py` | Minimal DI for CLI options | None |
| `types.py` | Data structures (Config, MachineConfig, Tags) | **MEDIUM** - Config contains DO fields, tags rely on DO |
| `cloud_config.py` | Cloud-init generation | **LOW** - Actually provider-agnostic |
| `util.py` | Helper functions for DO API | **CRITICAL** - All helpers work with DO objects |
| `factory.py` | YAML factory function | None - just creates YAML reader |
| All subcommands | Entry points for CLI actions | **CRITICAL** - Direct DO API usage throughout |

---

### 7. WHAT WOULD NEED TO CHANGE FOR VULTR SUPPORT

To add Vultr support alongside DigitalOcean would require:

#### 1. **Configuration Layer**
- Extend config.py to support `vultr:` section alongside `digital-ocean:`
- Create provider detection/selection logic
- Map provider-specific parameters (Vultr uses different region/size/image formats)
- Example: Vultr uses "VULTR_REGION_EMDOSTROM" while DO uses "nyc3"

#### 2. **Provider Abstraction (NEW)**
Create a provider interface:
```python
# machine/providers/base.py
from abc import ABC, abstractmethod

class CloudProvider(ABC):
    @abstractmethod
    def create_vm(self, name, size, image, region, user_data, tags) -> str: pass
    @abstractmethod
    def destroy_vm(self, vm_id) -> bool: pass
    @abstractmethod
    def list_vms(self, tag=None) -> List[VM]: pass
    @abstractmethod
    def get_ssh_key(self, name) -> SSHKey: pass
    @abstractmethod
    def create_dns_record(self, zone, name, ip) -> DNSRecord: pass
    @abstractmethod
    def delete_dns_record(self, zone, record_id) -> bool: pass
    # ... etc

# machine/providers/digitalocean.py
class DigitalOceanProvider(CloudProvider):
    def __init__(self, token: str):
        self.manager = digitalocean.Manager(token=token)
    
    def create_vm(self, ...): pass
    # ... implementations

# machine/providers/vultr.py
class VultrProvider(CloudProvider):
    def __init__(self, api_key: str):
        self.client = vultr.VultrClient(api_key)
    
    def create_vm(self, ...): pass
    # ... implementations
```

#### 3. **Provider Factory**
```python
# machine/provider_factory.py
class ProviderFactory:
    @staticmethod
    def create(config: Config) -> CloudProvider:
        if "digital-ocean" in config:
            return DigitalOceanProvider(config.access_token)
        elif "vultr" in config:
            return VultrProvider(config.api_key)
        else:
            raise ValueError("No provider configured")
```

#### 4. **Config Structure Changes**
```yaml
# Support both (or select one):
digital-ocean:
    access-token: ...
    # DO-specific settings

vultr:
    api-key: ...
    # Vultr-specific settings

# Provider-agnostic settings:
defaults:
    machine-size: c1
    image: ubuntu-22-04-x64
    region: ewr
    ssh-key: my-key

machines:
    # ... unchanged
```

#### 5. **Commands Refactoring**
Every subcommand would need refactoring from:
```python
# Current (tightly coupled):
manager = digitalocean.Manager(token=config.access_token)
droplets = manager.get_all_droplets()

# To (provider-agnostic):
provider = get_provider(config)
vms = provider.list_vms()
```

#### 6. **Type System Updates**
- Create vendor-neutral types: `VM`, `DNSRecord`, `SSHKey` instead of DO's `Droplet`, `Domain`, `Record`
- These would wrap provider-specific objects or be translated from them
- Add region/image/size mapping layer since each provider uses different names

#### 7. **Capability Detection**
Some operations wouldn't work uniformly:
- **Tagging**: DO has native tags, Vultr may not - would need workaround (metadata API, naming convention)
- **Projects**: DO has projects, Vultr doesn't - need feature detection
- **DNS Management**: Both support it but API differs
- **Status Checks**: Custom endpoint at `:4242/cgi-bin/cloud-init-status` would still work (cloud-init is universal)

---

### 8. MIGRATION COMPLEXITY SUMMARY

| Area | Complexity | Notes |
|---|---|---|
| Configuration | **HIGH** | Provider detection, parameter mapping |
| Subcommands | **VERY HIGH** | All 9 subcommands need refactoring |
| Provider Abstraction | **HIGH** | Create base class, 2 implementations (DO, Vultr) |
| Type System | **MEDIUM** | Map provider models to vendor-neutral types |
| Tagging/Session Management | **HIGH** | DO tags are native; Vultr alternative needed |
| DNS Management | **MEDIUM** | Different API but both support it |
| Testing | **HIGH** | Would need mocks/adapters for both providers |
| **Overall Effort** | **VERY HIGH** | Estimated: 2-3 weeks for experienced developer |

---

### 9. CURRENT ARCHITECTURE DIAGRAM

```
main.py (Click group)
    ↓
config.py (loads config.yml)
    ↓ hardcoded to expect "digital-ocean" section
types.py (Config, MachineConfig - with DO fields)
    ↓
subcommands/* (create, destroy, list, etc.)
    ↓ each imports and uses directly
python-digitalocean library
    ↓
DigitalOcean API

util.py (helpers)
    ↓ works with DO objects
cloud_config.py (generates cloud-init - provider-agnostic!)
    ↓
cloud-init (on spawned VMs)
```

**Missing:** Any abstraction between subcommands and `python-digitalocean`. Direct dependency.

---

### 10. WHAT'S PROVIDER-AGNOSTIC

- **cloud_config.py** - Cloud-init script generation works with any provider
- **cloud-init execution** - Standard across all major cloud providers
- **Session management logic** - Tag-based filtering approach (though needs adaptation for Vultr)
- **Machine type definitions** - Config-driven machine profiles
- **CLI interface** - Click framework is provider-agnostic
- **Custom status checks** - The HTTP endpoint polling would work for any provider

---

### Files Summary

**All files read during analysis:**

1. `/home/david/projects/stirlingbridge/machine/machine/main.py` - Entry point (58 lines)
2. `/home/david/projects/stirlingbridge/machine/machine/di.py` - Minimal DI (8 lines)
3. `/home/david/projects/stirlingbridge/machine/machine/config.py` - **HIGH coupling** (104 lines)
4. `/home/david/projects/stirlingbridge/machine/machine/factory.py` - YAML factory (9 lines)
5. `/home/david/projects/stirlingbridge/machine/machine/types.py` - **MEDIUM coupling** (40 lines)
6. `/home/david/projects/stirlingbridge/machine/machine/cloud_config.py` - **LOW coupling** (48 lines)
7. `/home/david/projects/stirlingbridge/machine/machine/util.py` - **CRITICAL coupling** (89 lines)
8. `/home/david/projects/stirlingbridge/machine/machine/constants.py` - Constants (6 lines)
9. `/home/david/projects/stirlingbridge/machine/machine/defaults.py` - Empty (3 lines)
10. `/home/david/projects/stirlingbridge/machine/machine/log.py` - Logging (19 lines)
11. Subcommands (create, destroy, list, status, ssh_keys, domains, projects, list_domain, types) - **ALL HIGH coupling**
12. Test files (config, input_validation) - Show validation and env expansion
13. `pyproject.toml` - Dependencies (56 lines)
14. `README.md` - Documentation with config reference (328 lines)

**Total codebase size:** ~900 lines of actual code + ~330 lines of config/structure
agentId: ae1b0ce74900bf624 (for resuming to continue this agent's work if needed)
<usage>total_tokens: 44145
tool_uses: 27
duration_ms: 103158</usage>
Thinking

Here's a summary of what it would take to add Vultr support (issue #43):

Current State

The codebase is tightly coupled to DigitalOcean — there's no provider abstraction layer. Every subcommand directly imports and uses python-digitalocean classes (Manager, Droplet, Domain, etc.). The config loader is hardcoded to expect a digital-ocean section.

One bright spot: cloud_config.py is already provider-agnostic since cloud-init is a cross-provider standard.

What Would Need to Change

1. Provider Abstraction Layer (new)

Create a CloudProvider ABC with methods like create_vm(), destroy_vm(), list_vms(), get_ssh_key(), create_dns_record(), etc. Then implement DigitalOceanProvider and VultrProvider.

2. Config (config.py, types.py)

  • Support a vultr: section alongside digital-ocean:
  • Add provider selection logic
  • Map provider-specific parameters (regions, images, sizes all differ between providers)

3. All 9 Subcommands

Every subcommand (create, destroy, list, status, ssh-keys, domains, list-domain, projects, types) currently calls DO APIs directly and would need refactoring to use the provider abstraction.

4. Utilities (util.py)

All helpers (projectFromName, sshKeyFromName, dnsRecordIdFromName) work directly with DO objects and would need provider-neutral equivalents.

5. Tagging/Session System

DO has native tags used for session tracking. Vultr may need an alternative mechanism (metadata, naming conventions).

6. Region/Image/Size Mapping

Hardcoded DO-specific lists in create.py (e.g., ["NYC1", "NYC3", ...], ["ubuntu-22-04-x64", ...]) would need per-provider equivalents.

Vultr Specifics

Vultr is a reasonable choice — it supports cloud-init, has a REST API, offers DNS management, and has similar pricing. The vultr Python library exists. Key differences to handle: - Different auth (API key vs DO access token) - Different resource naming (instances vs droplets, plans vs size slugs) - No native "projects" concept - Different region/image identifiers

Recommended Approach

  1. Introduce the provider abstraction first (without Vultr) — refactor all subcommands to go through a CloudProvider interface, with DigitalOceanProvider as the sole implementation
  2. Then add VultrProvider as a second implementation
  3. Update config to support provider selection

This is a significant refactor (~900 lines of code, all subcommands affected) but the codebase is small enough that it's very manageable.

Ok sounds good. Can you begin working on this?

Thinking

Let me start by reading the key files I'll need to modify.

Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/types.py"
}
     1→from dataclasses import dataclass
     2→
     3→TAG_MACHINE_CREATED = "machine:created"
     4→TAG_MACHINE_TYPE_PREFIX = "machine:type:"
     5→TAG_MACHINE_SESSION_PREFIX = "machine:session:"
     6→
     7→
     8→@dataclass
     9→class CliOptions:
    10→    debug: bool
    11→    quiet: bool
    12→    verbose: bool
    13→    dry_run: bool
    14→
    15→
    16→@dataclass
    17→class Config:
    18→    access_token: str
    19→    ssh_key: str
    20→    dns_zone: str
    21→    machine_size: str
    22→    image: str
    23→    region: str
    24→    project: str
    25→
    26→
    27→@dataclass
    28→class MainCmdCtx:
    29→    config: Config
    30→    session_id: str
    31→
    32→
    33→@dataclass
    34→class MachineConfig:
    35→    new_user_name: str
    36→    script_url: str
    37→    script_dir: str
    38→    script_path: str
    39→    script_args: str
    40→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/config.py"
}
     1→import os
     2→import re
     3→from pathlib import Path
     4→from machine.di import d
     5→from machine.factory import yaml
     6→from machine.log import fatal_error, debug
     7→from machine.types import Config, MachineConfig
     8→from machine import constants
     9→
    10→_env_var_pattern = re.compile(r"\$\{([^}]+)\}")
    11→
    12→
    13→def _expand_env_vars(value):
    14→    if isinstance(value, str):
    15→
    16→        def _replace(match):
    17→            expr = match.group(1)
    18→            if ":-" in expr:
    19→                var_name, default = expr.split(":-", 1)
    20→                return os.environ.get(var_name, default)
    21→            else:
    22→                if expr not in os.environ:
    23→                    fatal_error(f"Environment variable '{expr}' referenced in config is not set")
    24→                return os.environ[expr]
    25→
    26→        return _env_var_pattern.sub(_replace, value)
    27→    elif isinstance(value, dict):
    28→        return {k: _expand_env_vars(v) for k, v in value.items()}
    29→    elif isinstance(value, list):
    30→        return [_expand_env_vars(item) for item in value]
    31→    return value
    32→
    33→
    34→class _loaded_config:
    35→    c: any = None
    36→
    37→
    38→def _load_config_data(config_file_name: str):
    39→    if not config_file_name:
    40→        config_file_name = constants.default_config_file_path
    41→    config_path = Path(os.path.expanduser(config_file_name))
    42→    if not config_path.exists():
    43→        fatal_error(f"Error: Config file: {config_path} not found")
    44→    config = _expand_env_vars(yaml().load(open(config_path, "r")))
    45→    if d.opt.debug:
    46→        debug(f"Loaded config file: {config_path}")
    47→        debug(f"Parsed config: {config}")
    48→    _loaded_config.c = config
    49→    return config
    50→
    51→
    52→def _require_key(d, key, section_name):
    53→    if key not in d:
    54→        fatal_error(f"Required key '{key}' not found in '{section_name}' section of config file")
    55→    return d[key]
    56→
    57→
    58→def get(config_file_name: str) -> Config:
    59→    config = _load_config_data(config_file_name)
    60→    if "digital-ocean" not in config:
    61→        fatal_error("Required 'digital-ocean' section not found in config file")
    62→    config_do = config["digital-ocean"]
    63→    return Config(
    64→        _require_key(config_do, "access-token", "digital-ocean"),
    65→        _require_key(config_do, "ssh-key", "digital-ocean"),
    66→        config_do.get("dns-zone"),
    67→        _require_key(config_do, "machine-size", "digital-ocean"),
    68→        _require_key(config_do, "image", "digital-ocean"),
    69→        _require_key(config_do, "region", "digital-ocean"),
    70→        _require_key(config_do, "project", "digital-ocean"),
    71→    )
    72→
    73→
    74→def get_machine(name: str) -> MachineConfig:
    75→    if not _loaded_config.c:
    76→        fatal_error("Attempt to fetch machine data before config loaded")
    77→    config = _loaded_config.c
    78→    if "machines" not in config:
    79→        fatal_error("Required 'machines' section not found in config file")
    80→    config_machines = config["machines"]
    81→    if name not in config_machines:
    82→        fatal_error(f"Machine type '{name}' not found in config file. Available types: {', '.join(config_machines.keys())}")
    83→    target_config = config_machines[name]
    84→    return MachineConfig(
    85→        _require_key(target_config, "new-user-name", f"machines.{name}"),
    86→        target_config.get("script-url"),
    87→        target_config.get("script-dir"),
    88→        target_config.get("script-path"),
    89→        target_config.get("script-args"),
    90→    )
    91→
    92→
    93→def get_machines():
    94→    if not _loaded_config.c:
    95→        fatal_error("Attempt to fetch machine data before config loaded")
    96→    config = _loaded_config.c
    97→
    98→    if "machines" not in config:
    99→        fatal_error("Required 'machines' section not found in config file")
   100→    ret = {}
   101→    for name in config["machines"]:
   102→        ret[name] = get_machine(name)
   103→    return ret
   104→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/util.py"
}
     1→import os
     2→import uuid
     3→
     4→from digitalocean import Domain, Manager, Project, SSHKey
     5→
     6→from machine.factory import yaml
     7→from machine.constants import default_config_dir_path, default_session_id_file_path
     8→from machine.types import TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_CREATED
     9→
    10→from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX
    11→
    12→
    13→def projectFromName(manager: Manager, name: str) -> Project:
    14→    projects = manager.get_all_projects()
    15→    for project in projects:
    16→        if project.name == name:
    17→            return project
    18→    return None
    19→
    20→
    21→def sshKeyFromName(manager: Manager, name: str) -> SSHKey:
    22→    keys = manager.get_all_sshkeys()
    23→    for key in keys:
    24→        if key.name == name:
    25→            return key
    26→    return None
    27→
    28→
    29→def dnsRecordIdFromName(domain: Domain, name: str) -> str:
    30→    records = domain.get_records()
    31→    for record in records:
    32→        if record.name == name:
    33→            return record.id
    34→    return None
    35→
    36→
    37→def get_machine_type(droplet):
    38→    type = next((t for t in droplet.tags if TAG_MACHINE_TYPE_PREFIX in t), "").replace(TAG_MACHINE_TYPE_PREFIX, "")
    39→    if not type:
    40→        return None
    41→    return type
    42→
    43→
    44→def is_machine_created(droplet):
    45→    return TAG_MACHINE_CREATED in droplet.tags
    46→
    47→
    48→def is_same_session(command_context: MainCmdCtx, droplet):
    49→    return TAG_MACHINE_SESSION_PREFIX + command_context.session_id in droplet.tags
    50→
    51→
    52→def load_session_id():
    53→    if not os.path.exists(default_config_dir_path):
    54→        os.mkdir(default_config_dir_path)
    55→
    56→    if not os.path.exists(default_session_id_file_path):
    57→        with open(default_session_id_file_path, "w") as f:
    58→            f.write("id: " + str(uuid.uuid4()).replace("-", "")[0:8])
    59→
    60→    sessionid_config = yaml().load(open(default_session_id_file_path, "r"))
    61→    return sessionid_config["id"]
    62→
    63→
    64→def droplet_to_json_obj(droplet):
    65→    return {
    66→        "id": droplet.id,
    67→        "name": droplet.name,
    68→        "tags": droplet.tags,
    69→        "region": droplet.region["slug"] if droplet.region else None,
    70→        "ip": droplet.ip_address,
    71→        "type": get_machine_type(droplet),
    72→    }
    73→
    74→
    75→def dns_record_to_json_obj(dns_record, zone, droplet):
    76→    if droplet:
    77→        droplet = droplet_to_json_obj(droplet)
    78→
    79→    return {
    80→        "id": dns_record.id,
    81→        "droplet": droplet,
    82→        "name": dns_record.name,
    83→        "fqdn": f"{dns_record.name}.{zone}",
    84→        "zone": zone,
    85→        "data": dns_record.data,
    86→        "ttl": dns_record.ttl,
    87→        "type": dns_record.type,
    88→    }
    89→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/main.py"
}
     1→import os
     2→
     3→import click
     4→
     5→from machine import config
     6→from machine import constants
     7→from machine.di import d
     8→from machine.log import output
     9→from machine.subcommands import create, destroy, list, projects, ssh_keys, domains, list_domain, types, status
    10→from machine.types import CliOptions, MainCmdCtx
    11→from machine.util import load_session_id
    12→
    13→CLICK_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
    14→
    15→
    16→@click.group(context_settings=CLICK_CONTEXT_SETTINGS)
    17→@click.option("--debug", is_flag=True, default=False, help="Enable debug output")
    18→@click.option("--quiet", is_flag=True, default=False, help="Suppress all non-essential output")
    19→@click.option("--verbose", is_flag=True, default=False, help="Enable verbose output")
    20→@click.option("--dry-run", is_flag=True, default=False, help="Run but do not do anything")
    21→@click.option("--config-file", metavar="<PATH>", help=f"Specify the config file (default {constants.default_config_file_path})")
    22→@click.option(
    23→    "--session-id", metavar="<ID>", default=load_session_id(), help=f"Override the default session ID (default {load_session_id()})"
    24→)
    25→@click.pass_context
    26→def main(context, debug, quiet, verbose, dry_run, config_file, session_id):
    27→    options = CliOptions(debug, quiet, verbose, dry_run)
    28→    d.opt = options
    29→    # Skip config loading for version subcommand since it doesn't need it
    30→    # and should work even when no config file exists (#25)
    31→    if context.invoked_subcommand == "version":
    32→        return
    33→    main_context = MainCmdCtx(config.get(config_file), session_id)
    34→    context.obj = main_context
    35→
    36→
    37→@main.command()
    38→@click.pass_context
    39→def version(context):
    40→    try:
    41→        version_file = os.path.join(os.path.dirname(__file__), "version.txt")
    42→        with open(version_file) as f:
    43→            version_string = f.read().strip()
    44→    except FileNotFoundError:
    45→        version_string = "dev"
    46→    output(version_string)
    47→
    48→
    49→main.add_command(create.command, "create")
    50→main.add_command(destroy.command, "destroy")
    51→main.add_command(domains.command, "domains")
    52→main.add_command(list.command, "list")
    53→main.add_command(list_domain.command, "list-domain")
    54→main.add_command(projects.command, "projects")
    55→main.add_command(ssh_keys.command, "ssh-keys")
    56→main.add_command(types.command, "types")
    57→main.add_command(status.command, "status")
    58→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/cloud_config.py"
}
     1→from expandvars import expand
     2→
     3→from machine.log import fatal_error
     4→from machine.types import MachineConfig
     5→from machine.util import Manager, sshKeyFromName
     6→
     7→
     8→def get_user_data(manager: Manager, ssh_key_name: str, fqdn: str, machine_config: MachineConfig):
     9→    if not fqdn:
    10→        fqdn = ""
    11→
    12→    script_args = machine_config.script_args
    13→    if not script_args:
    14→        script_args = ""
    15→
    16→    ssh_key = sshKeyFromName(manager, ssh_key_name)
    17→    if not ssh_key:
    18→        fatal_error(f"Error: SSH key '{ssh_key_name}' not found in DigitalOcean")
    19→    ssh_public_key = ssh_key.public_key
    20→    escaped_args = script_args.replace('"', '\\"')
    21→
    22→    cloud_env = {
    23→        "MACHINE_SCRIPT_URL": machine_config.script_url,
    24→        "MACHINE_SCRIPT_DIR": machine_config.script_dir,
    25→        "MACHINE_FQDN": fqdn,
    26→    }
    27→
    28→    # Exand here because otherwise escaping the vars properly for nested scripts is a guessing game
    29→    escaped_args = expand(escaped_args, environ=cloud_env)
    30→    cloud_config = f"""#cloud-config
    31→users:
    32→  - name: {machine_config.new_user_name}
    33→    groups: sudo
    34→    shell: /bin/bash
    35→    sudo: ['ALL=(ALL) NOPASSWD:ALL']
    36→    ssh-authorized-keys:
    37→      - {ssh_public_key}
    38→"""
    39→    if machine_config.script_url and machine_config.script_dir and machine_config.script_path:
    40→        cloud_config += f"""
    41→runcmd:
    42→  - mkdir -p {machine_config.script_dir}
    43→  - curl -L {machine_config.script_url} -o {machine_config.script_path}
    44→  - chmod +x {machine_config.script_path}
    45→  - [su, -c, "env {" ".join([f"{k}='{v}'" for k, v in cloud_env.items()])} {machine_config.script_path} {escaped_args}", -, {machine_config.new_user_name}]
    46→"""
    47→    return cloud_config
    48→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/di.py"
}
     1→# Exposing the truth that dependency injection is just a fancy name for global variables
     2→
     3→from machine.types import CliOptions
     4→
     5→
     6→class d:
     7→    opt: CliOptions = None
     8→

Now let me read all the subcommands.

Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/create.py"
}
     1→import click
     2→import digitalocean
     3→import time
     4→from machine.config import get_machine
     5→from machine.di import d
     6→from machine.log import fatal_error, info, debug, output
     7→from machine.types import MainCmdCtx, TAG_MACHINE_CREATED, TAG_MACHINE_TYPE_PREFIX
     8→from machine.util import projectFromName, sshKeyFromName
     9→from machine.cloud_config import get_user_data
    10→
    11→from machine.types import TAG_MACHINE_SESSION_PREFIX
    12→
    13→
    14→def _validate_region(region: str):
    15→    valid_regions = ["NYC1", "NYC3", "AMS3", "SFO2", "SFO3", "SGP1", "LON1", "FRA1", "TOR1", "BLR1", "SYD1"]
    16→    if region is not None and region.upper() not in valid_regions:
    17→        fatal_error(f"Error: region {region} is not one of {valid_regions}")
    18→
    19→
    20→def _validate_image(image: str):
    21→    valid_images = [
    22→        "almalinux-8-x64",
    23→        "almalinux-9-x64",
    24→        "centos-stream-9-x64",
    25→        "debian-11-x64",
    26→        "debian-12-x64",
    27→        "fedora-39-x64",
    28→        "fedora-40-x64",
    29→        "rockylinux-9-x64",
    30→        "rockylinux-8-x64",
    31→        "ubuntu-20-04-x64",
    32→        "ubuntu-22-04-x64",
    33→        "ubuntu-24-04-x64",
    34→    ]
    35→    if image is not None and image not in valid_images:
    36→        info(f"Warning: image {image} is not one of these known valid images: {valid_images}")
    37→
    38→
    39→@click.command(help="Create a machine")
    40→@click.option("--name", "-n", required=True, metavar="<MACHINE-NAME>", help="Name for new machine")
    41→@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="tag to be applied to new machine")
    42→@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="create a machine of this type")
    43→@click.option("--region", "-r", metavar="<REGION-CODE>", help="create a machine in this region (overrides default from config)")
    44→@click.option(
    45→    "--machine-size", "-s", metavar="<MACHINE-SLUG>", help="create a machine of this size (overrides default from config)"
    46→)
    47→@click.option("--image", "-s", metavar="<IMAGE-NAME>", help="create a machine from this image (overrides default from config)")
    48→@click.option("--wait-for-ip/--no-wait-for-up", default=False)
    49→@click.option("--update-dns/--no-update-dns", default=True)
    50→@click.option("--initialize/--no-initialize", default=True)
    51→@click.pass_context
    52→def command(context, name, tag, type, region, machine_size, image, wait_for_ip, update_dns, initialize):
    53→    command_context: MainCmdCtx = context.obj
    54→    config = command_context.config
    55→
    56→    if update_dns and not config.dns_zone:
    57→        fatal_error("Error: DNS update requested but no zone configured")
    58→
    59→    manager = digitalocean.Manager(token=command_context.config.access_token)
    60→
    61→    user_data = None
    62→    if initialize:
    63→        if not type:
    64→            fatal_error("Error: a machine type must be supplied")
    65→        machine_config = get_machine(type)
    66→        if not machine_config:
    67→            fatal_error(f"Error: machine type {type} is not defined")
    68→        fqdn = f"{name}.{config.dns_zone}" if config.dns_zone else None
    69→        user_data = get_user_data(manager, config.ssh_key, fqdn, machine_config)
    70→        if d.opt.debug:
    71→            info("user-data is:")
    72→            info(user_data)
    73→
    74→    ssh_key = sshKeyFromName(manager, config.ssh_key)
    75→    if not ssh_key:
    76→        fatal_error(f"Error: SSH key '{config.ssh_key}' not found in DigitalOcean")
    77→
    78→    _validate_region(region)
    79→    _validate_image(image)
    80→
    81→    tags = [
    82→        TAG_MACHINE_SESSION_PREFIX + command_context.session_id,
    83→        TAG_MACHINE_CREATED,
    84→    ]
    85→    if type:
    86→        tags.append(TAG_MACHINE_TYPE_PREFIX + type.lower())
    87→    if tag:
    88→        tags.append(tag)
    89→
    90→    droplet = digitalocean.Droplet(
    91→        token=config.access_token,
    92→        name=name,
    93→        region=region if region is not None else config.region,
    94→        image=image if image is not None else config.image,
    95→        size_slug=machine_size if machine_size is not None else config.machine_size,
    96→        ssh_keys=[ssh_key],
    97→        tags=tags,
    98→        user_data=user_data,
    99→        backups=False,
   100→    )
   101→    # Create the droplet
   102→    # This call returns nothing, it modifies the droplet object
   103→    droplet.create()
   104→    if droplet.id:
   105→        if d.opt.quiet:
   106→            output(f"{droplet.id}")
   107→        else:
   108→            output(f"New droplet created with id: {droplet.id}")
   109→    # If requested, assign to a specified project
   110→    if config.project:
   111→        project_name = config.project
   112→        project = projectFromName(manager, project_name)
   113→        if not project:
   114→            fatal_error(f"Error: Project {project_name} does not exist, machine created but not assigned to project")
   115→        project.assign_resource([f"do:droplet:{droplet.id}"])
   116→        if d.opt.verbose:
   117→            info(f"Assigned droplet to project: {project}")
   118→    # If requested, or if we are going to set a DNS record get the droplet's IPv4 address
   119→    if wait_for_ip or update_dns:
   120→        ip_address = None
   121→        while not ip_address:
   122→            time.sleep(1)
   123→            droplet.load()
   124→            ip_address = droplet.ip_address
   125→            if d.opt.verbose:
   126→                output("Waiting for droplet IP address")
   127→        if d.opt.quiet:
   128→            info(f"{ip_address}")
   129→        else:
   130→            info(f"IP Address: {ip_address}")
   131→    # If requested, and we have the IP address, create a DNS host record for the droplet
   132→    if update_dns and ip_address and config.dns_zone:
   133→        zone = config.dns_zone
   134→        host = name
   135→        if d.opt.debug:
   136→            debug(f"Setting host record {host}.{zone} to {ip_address}")
   137→        domain = digitalocean.Domain(token=config.access_token, name=zone)
   138→        try:
   139→            record = domain.create_new_domain_record(type="A", ttl=60 * 5, name=host, data=ip_address, tag=TAG_MACHINE_CREATED)
   140→        except digitalocean.NotFoundError:
   141→            info(f"Warning: DNS zone '{zone}' not found in DigitalOcean, DNS record not set")
   142→            record = None
   143→        if record:
   144→            if d.opt.verbose:
   145→                info(f"Created DNS record:{record}")
   146→            if not d.opt.quiet:
   147→                info(f"DNS: {host}.{zone}")
   148→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/destroy.py"
}
     1→import click
     2→import digitalocean
     3→
     4→from machine.di import d
     5→from machine.log import debug, fatal_error, output
     6→from machine.util import dnsRecordIdFromName, is_machine_created
     7→from machine.types import MainCmdCtx
     8→
     9→from machine.util import is_same_session
    10→
    11→
    12→@click.command(help="Destroy one or more machines")
    13→@click.option("--confirm/--no-confirm", default=True)
    14→@click.option("--delete-dns/--no-delete-dns", default=True)
    15→@click.option(
    16→    "--all",
    17→    is_flag=True,
    18→    default=False,
    19→    help="Include machines not created by this tool",
    20→)
    21→@click.argument("droplet-ids", nargs=-1)
    22→@click.pass_context
    23→def command(context, confirm, delete_dns, all, droplet_ids):
    24→    command_context: MainCmdCtx = context.obj
    25→    config = command_context.config
    26→    manager = digitalocean.Manager(token=config.access_token)
    27→    for droplet_id in droplet_ids:
    28→        try:
    29→            droplet = manager.get_droplet(droplet_id)
    30→        except digitalocean.NotFoundError:
    31→            fatal_error(f"Error: machine with id {droplet_id} not found")
    32→        name = droplet.name
    33→
    34→        if not is_machine_created(droplet) and not all:
    35→            fatal_error(f'ERROR: Cannot destroy droplet "{name}" (id: {droplet.id}), it was not created by machine.')
    36→
    37→        if not is_same_session(command_context, droplet) and not all:
    38→            fatal_error(
    39→                f'ERROR: Cannot destroy droplet "{name}" (id: {droplet.id}), it was created by a different session of machine.'
    40→            )
    41→
    42→        if confirm:
    43→            output(
    44→                "Type YES (not y or yes or Yes) to confirm that you want to permanently"
    45→                f' DELETE/DESTROY droplet "{name}" (id: {droplet.id})'
    46→            )
    47→            confirmation = input()
    48→            if confirmation != "YES":
    49→                fatal_error("Destroy operation aborted, not confirmed by user")
    50→        result = droplet.destroy()
    51→
    52→        if result and delete_dns and config.dns_zone:
    53→            zone = config.dns_zone
    54→            if d.opt.debug:
    55→                debug(f"Deleting host record {name}.{zone}")
    56→            domain = digitalocean.Domain(token=config.access_token, name=zone)
    57→            if not domain:
    58→                fatal_error(f"Error: Domain {domain} does not exist, machine destroyed but DNS record not removed")
    59→            record_id = dnsRecordIdFromName(domain, name)
    60→            if record_id:
    61→                if d.opt.debug:
    62→                    debug(f"Deleting dns record id={record_id}")
    63→                domain.delete_domain_record(id=record_id)
    64→            else:
    65→                if d.opt.debug:
    66→                    debug(f"No dns record found for {name}.{zone}")
    67→
    68→        if not result:
    69→            fatal_error("Error destroying machine")
    70→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/list.py"
}
     1→import click
     2→import json
     3→import digitalocean
     4→
     5→from machine.log import fatal_error, output
     6→from machine.types import MainCmdCtx, TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_SESSION_PREFIX
     7→from machine.util import get_machine_type, is_machine_created, is_same_session, droplet_to_json_obj
     8→
     9→
    10→def print_normal(droplets):
    11→    for droplet in droplets:
    12→        region = droplet.region["slug"] if droplet.region else "unknown"
    13→        output(f"{droplet.name} ({droplet.id}, {region}, {get_machine_type(droplet)}): {droplet.ip_address}")
    14→
    15→
    16→def print_quiet(droplets):
    17→    for droplet in droplets:
    18→        output(droplet.id)
    19→
    20→
    21→def print_json(droplets):
    22→    output(json.dumps([droplet_to_json_obj(d) for d in droplets]))
    23→
    24→
    25→def get_droplets(command_context, id=None, name=None, tag=None, type=None, region=None, all=False):
    26→    manager = digitalocean.Manager(token=command_context.config.access_token)
    27→    droplets = []
    28→    if id:
    29→        droplet = manager.get_droplet(id)
    30→        if droplet:
    31→            droplets.append(droplet)
    32→
    33→    if all:
    34→        droplets = manager.get_all_droplets()
    35→    else:
    36→        droplets = manager.get_all_droplets(tag_name=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)
    37→
    38→    # we can't combine most filters over the API, so we also filter ourselves
    39→    if name:
    40→        droplets = filter(lambda d: d.name == name, droplets)
    41→
    42→    if tag:
    43→        droplets = filter(lambda d: tag in d.tags, droplets)
    44→
    45→    if type:
    46→        droplets = filter(lambda d: TAG_MACHINE_TYPE_PREFIX + type.lower() in d.tags, droplets)
    47→
    48→    if region:
    49→        droplets = filter(lambda d: d.region and region == d.region["slug"], droplets)
    50→
    51→    if not all:
    52→        droplets = filter(lambda d: is_machine_created(d) and is_same_session(command_context, d), droplets)
    53→
    54→    return list(droplets)
    55→
    56→
    57→@click.command(help="List machines")
    58→@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
    59→@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
    60→@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
    61→@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
    62→@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
    63→@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
    64→@click.option(
    65→    "--all",
    66→    is_flag=True,
    67→    default=False,
    68→    help="All machines, including those not created by this tool or by other sessions",
    69→)
    70→@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
    71→@click.option(
    72→    "--unique",
    73→    is_flag=True,
    74→    default=False,
    75→    help="Return an error if there is more than one match",
    76→)
    77→@click.pass_context
    78→def command(context, id, name, tag, type, region, all, output, quiet, unique):
    79→    command_context: MainCmdCtx = context.obj
    80→
    81→    droplets = get_droplets(command_context, id, name, tag, type, region, all)
    82→    if unique and len(droplets) > 1:
    83→        fatal_error(f"ERROR: --unique match required but {len(droplets)} matches found.")
    84→
    85→    if output == "json":
    86→        print_json(droplets)
    87→    elif quiet:
    88→        print_quiet(droplets)
    89→    else:
    90→        print_normal(droplets)
    91→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/status.py"
}
     1→import click
     2→import json
     3→import requests
     4→
     5→from machine.log import output
     6→from machine.subcommands.list import get_droplets
     7→from machine.types import MainCmdCtx
     8→
     9→
    10→def print_normal(statuses):
    11→    for status in statuses:
    12→        output(
    13→            f"{status['name']} ({status['id']}):\t" + "\t".join([f"{k}={v}" for k, v in status.items() if k not in ["name", "id"]])
    14→        )
    15→
    16→
    17→def print_json(statuses):
    18→    output(json.dumps(statuses))
    19→
    20→
    21→@click.command(help="Machine status")
    22→@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
    23→@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
    24→@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
    25→@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
    26→@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
    27→@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
    28→@click.option("--status-check", metavar="<CHECK>", default="cloud-init-status", help="Status check to perform")
    29→@click.option(
    30→    "--all",
    31→    is_flag=True,
    32→    default=False,
    33→    help="All machines, including those not created by this tool or by other sessions",
    34→)
    35→@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
    36→@click.pass_context
    37→def command(context, id, name, tag, type, region, all, output, quiet, status_check):
    38→    command_context: MainCmdCtx = context.obj
    39→
    40→    droplets = get_droplets(command_context, id, name, tag, type, region, all)
    41→
    42→    statuses = []
    43→    for d in droplets:
    44→        status = {"name": d.name, "id": d.id, "droplet-status": d.status, status_check: "UNKNOWN"}
    45→        try:
    46→            r = requests.get(f"http://{d.ip_address}:4242/cgi-bin/{status_check}")
    47→            if 200 == r.status_code:
    48→                status[status_check] = r.json()["status"]
    49→        except:  # noqa: E722
    50→            pass
    51→        statuses.append(status)
    52→
    53→    if output == "json":
    54→        print_json(statuses)
    55→    else:
    56→        print_normal(statuses)
    57→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/ssh_keys.py"
}
     1→import click
     2→import digitalocean
     3→from machine.log import output
     4→from machine.types import MainCmdCtx
     5→
     6→
     7→@click.command(help="List ssh keys")
     8→@click.pass_context
     9→def command(context):
    10→    command_context: MainCmdCtx = context.obj
    11→    manager = digitalocean.Manager(token=command_context.config.access_token)
    12→    my_keys = manager.get_all_sshkeys()
    13→    for key in my_keys:
    14→        output(f"{key.id}: {key.name} ({key.fingerprint})")
    15→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/domains.py"
}
     1→import click
     2→import digitalocean
     3→from machine.log import output
     4→from machine.types import MainCmdCtx
     5→
     6→
     7→@click.command(help="List dns domains")
     8→@click.pass_context
     9→def command(context):
    10→    command_context: MainCmdCtx = context.obj
    11→    manager = digitalocean.Manager(token=command_context.config.access_token)
    12→    my_domains = manager.get_all_domains()
    13→    for domain in my_domains:
    14→        output(f"{domain.name}")
    15→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/list_domain.py"
}
     1→import click
     2→import digitalocean
     3→import json
     4→
     5→from machine.log import fatal_error, output
     6→from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX
     7→from machine.util import dns_record_to_json_obj
     8→
     9→
    10→def print_normal(records, zone):
    11→    for record in records:
    12→        output(f"{record.name}\t{record.type}\t{record.data}")
    13→
    14→
    15→def print_quiet(records):
    16→    for record in records:
    17→        output(record.name)
    18→
    19→
    20→def print_json(records, droplets, zone):
    21→    simplified = []
    22→    for r in records:
    23→        droplet = next((d for d in droplets if r.data == d.ip_address), None)
    24→        simplified.append(dns_record_to_json_obj(r, zone, droplet))
    25→    output(json.dumps(simplified))
    26→
    27→
    28→@click.command(help="List domain records")
    29→@click.option("--name", "-n", metavar="<RECORD-NAME>", help="Filter by name")
    30→@click.option("--type", "-m", metavar="<RECORD-TYPE>", help="Filter by type (default A and AAAA)")
    31→@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
    32→@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
    33→@click.option(
    34→    "--all",
    35→    is_flag=True,
    36→    default=False,
    37→    help="Include all records, even those not created by this tool or created by other sessions",
    38→)
    39→@click.argument("zone", required=False)
    40→@click.pass_context
    41→def command(context, name, type, output, quiet, all, zone):
    42→    command_context: MainCmdCtx = context.obj
    43→    if not zone:
    44→        zone = command_context.config.dns_zone
    45→    if not zone:
    46→        fatal_error("Error: no DNS zone specified.")
    47→    domain = digitalocean.Domain(token=command_context.config.access_token, name=zone)
    48→    records = domain.get_records()
    49→
    50→    if type:
    51→        if type != "*":
    52→            records = filter(lambda r: r.type == type, records)
    53→    else:
    54→        records = filter(lambda r: r.type in ["A", "AAAA"], records)
    55→
    56→    manager = digitalocean.Manager(token=command_context.config.access_token)
    57→    if all:
    58→        droplets = manager.get_all_droplets()
    59→    else:
    60→        droplets = manager.get_all_droplets(tag_name=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)
    61→        droplet_ips = [d.ip_address for d in droplets]
    62→        records = filter(lambda r: r.data in droplet_ips, records)
    63→
    64→    records = list(records)
    65→    if output == "json":
    66→        print_json(records, droplets, zone)
    67→    elif quiet:
    68→        print_quiet(records)
    69→    else:
    70→        print_normal(records, zone)
    71→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/projects.py"
}
     1→import click
     2→import digitalocean
     3→from machine.log import output
     4→from machine.types import MainCmdCtx
     5→
     6→
     7→@click.command(help="List projects")
     8→@click.pass_context
     9→def command(context):
    10→    command_context: MainCmdCtx = context.obj
    11→    manager = digitalocean.Manager(token=command_context.config.access_token)
    12→    my_projects = manager.get_all_projects()
    13→    for project in my_projects:
    14→        output(f"{project.name}")
    15→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/subcommands/types.py"
}
     1→import click
     2→
     3→from machine.log import output
     4→from machine.config import get_machines
     5→
     6→
     7→@click.command(help="List projects")
     8→@click.pass_context
     9→def command(context):
    10→    config_machines = get_machines()
    11→    names = list(config_machines.keys())
    12→    names.sort()
    13→    for n in names:
    14→        output(n)
    15→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/pyproject.toml"
}
     1→[project]
     2→name = "machine"
     3→version = "1.0.0"
     4→description = "Utility for creating and managing VMs"
     5→readme = "README.md"
     6→license = "AGPL-3.0-only"
     7→requires-python = ">=3.8"
     8→authors = [
     9→    { name = "Stirlingbridge", email = "info@stirlingbridge.website" },
    10→]
    11→classifiers = [
    12→    "Programming Language :: Python :: 3.8",
    13→    "Operating System :: OS Independent",
    14→]
    15→dependencies = [
    16→    "click==8.1.7",
    17→    "python-digitalocean==1.17.0",
    18→    "ruamel.yaml>=0.17.32",
    19→    "requests==2.32.3",
    20→    "expandvars==1.0.0",
    21→]
    22→
    23→[project.scripts]
    24→machine = "machine.main:main"
    25→
    26→[project.urls]
    27→Homepage = "https://github.com/stirlingbridge/machine"
    28→
    29→[build-system]
    30→requires = ["hatchling"]
    31→build-backend = "hatchling.build"
    32→
    33→[tool.pytest.ini_options]
    34→markers = [
    35→    "e2e: end-to-end tests that create real resources on DigitalOcean (require E2E_* env vars)",
    36→]
    37→addopts = "-m 'not e2e'"
    38→
    39→[dependency-groups]
    40→dev = [
    41→    "ruff",
    42→    "pytest",
    43→]
    44→
    45→[tool.ruff]
    46→line-length = 132
    47→
    48→[tool.ruff.lint]
    49→select = ["E", "F", "W", "C90"]
    50→ignore = ["E203"]
    51→
    52→[tool.ruff.lint.mccabe]
    53→max-complexity = 25
    54→
    55→[tool.ruff.lint.per-file-ignores]
    56→"machine/cloud_config.py" = ["E501"]
    57→
Thinking

Let me check the constants and log modules, then read existing tests.

Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/constants.py"
}
     1→from pathlib import Path
     2→
     3→default_config_dir_path = str(Path.home()) + "/.machine"
     4→default_config_file_path = default_config_dir_path + "/config.yml"
     5→default_session_id_file_path = default_config_dir_path + "/session-id.yml"
     6→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/machine/log.py"
}
Glob
{
  "pattern": "tests/**/*.py"
}
     1→import sys
     2→
     3→
     4→def fatal_error(s: str):
     5→    print(s, file=sys.stderr)
     6→    sys.exit(1)
     7→
     8→
     9→def debug(s: str):
    10→    print(s, file=sys.stderr)
    11→
    12→
    13→def info(s: str):
    14→    print(s, file=sys.stderr)
    15→
    16→
    17→def output(s: str):
    18→    print(s)
    19→
/home/david/projects/stirlingbridge/machine/tests/test_input_validation.py
/home/david/projects/stirlingbridge/machine/tests/test_cli_integration.py
/home/david/projects/stirlingbridge/machine/tests/__init__.py
/home/david/projects/stirlingbridge/machine/tests/test_config.py
/home/david/projects/stirlingbridge/machine/tests/test_e2e.py
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/tests/test_config.py"
}
     1→import os
     2→import pytest
     3→from unittest.mock import patch
     4→from machine.config import _expand_env_vars
     5→
     6→
     7→class TestExpandEnvVars:
     8→    def test_plain_string_unchanged(self):
     9→        assert _expand_env_vars("hello world") == "hello world"
    10→
    11→    def test_non_string_types_unchanged(self):
    12→        assert _expand_env_vars(42) == 42
    13→        assert _expand_env_vars(3.14) == 3.14
    14→        assert _expand_env_vars(True) is True
    15→        assert _expand_env_vars(None) is None
    16→
    17→    def test_simple_variable_substitution(self):
    18→        with patch.dict(os.environ, {"MY_VAR": "my_value"}):
    19→            assert _expand_env_vars("${MY_VAR}") == "my_value"
    20→
    21→    def test_variable_embedded_in_string(self):
    22→        with patch.dict(os.environ, {"HOST": "example.com"}):
    23→            assert _expand_env_vars("https://${HOST}/api") == "https://example.com/api"
    24→
    25→    def test_multiple_variables_in_string(self):
    26→        with patch.dict(os.environ, {"HOST": "example.com", "PORT": "8080"}):
    27→            assert _expand_env_vars("${HOST}:${PORT}") == "example.com:8080"
    28→
    29→    def test_default_value_when_var_unset(self):
    30→        env = os.environ.copy()
    31→        env.pop("UNSET_VAR", None)
    32→        with patch.dict(os.environ, env, clear=True):
    33→            assert _expand_env_vars("${UNSET_VAR:-fallback}") == "fallback"
    34→
    35→    def test_default_value_ignored_when_var_set(self):
    36→        with patch.dict(os.environ, {"MY_VAR": "actual"}):
    37→            assert _expand_env_vars("${MY_VAR:-fallback}") == "actual"
    38→
    39→    def test_default_value_empty_string(self):
    40→        env = os.environ.copy()
    41→        env.pop("UNSET_VAR", None)
    42→        with patch.dict(os.environ, env, clear=True):
    43→            assert _expand_env_vars("${UNSET_VAR:-}") == ""
    44→
    45→    def test_unset_variable_without_default_exits(self):
    46→        env = os.environ.copy()
    47→        env.pop("MISSING_VAR", None)
    48→        with patch.dict(os.environ, env, clear=True):
    49→            with pytest.raises(SystemExit):
    50→                _expand_env_vars("${MISSING_VAR}")
    51→
    52→    def test_dict_values_expanded(self):
    53→        with patch.dict(os.environ, {"TOKEN": "secret123"}):
    54→            data = {"key": "${TOKEN}", "plain": "no-change"}
    55→            result = _expand_env_vars(data)
    56→            assert result == {"key": "secret123", "plain": "no-change"}
    57→
    58→    def test_nested_dict_expanded(self):
    59→        with patch.dict(os.environ, {"VAL": "deep"}):
    60→            data = {"outer": {"inner": "${VAL}"}}
    61→            result = _expand_env_vars(data)
    62→            assert result == {"outer": {"inner": "deep"}}
    63→
    64→    def test_list_values_expanded(self):
    65→        with patch.dict(os.environ, {"A": "x", "B": "y"}):
    66→            data = ["${A}", "literal", "${B}"]
    67→            result = _expand_env_vars(data)
    68→            assert result == ["x", "literal", "y"]
    69→
    70→    def test_mixed_nested_structure(self):
    71→        with patch.dict(os.environ, {"V": "replaced"}):
    72→            data = {"list": ["${V}", "fixed"], "nested": {"k": "${V}"}}
    73→            result = _expand_env_vars(data)
    74→            assert result == {"list": ["replaced", "fixed"], "nested": {"k": "replaced"}}
    75→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/tests/test_input_validation.py"
}
     1→"""Tests for graceful handling of invalid user input."""
     2→
     3→import os
     4→import subprocess
     5→import textwrap
     6→import pytest
     7→
     8→
     9→def run_machine(*args, env_override=None):
    10→    """Run the machine CLI as a subprocess and return the result."""
    11→    env = os.environ.copy()
    12→    if env_override:
    13→        env.update(env_override)
    14→    result = subprocess.run(
    15→        ["uv", "run", "machine"] + list(args),
    16→        capture_output=True,
    17→        text=True,
    18→        env=env,
    19→    )
    20→    return result
    21→
    22→
    23→def write_config(path, content):
    24→    with open(path, "w") as f:
    25→        f.write(textwrap.dedent(content))
    26→
    27→
    28→def valid_config():
    29→    return """\
    30→        digital-ocean:
    31→          access-token: fake-token
    32→          ssh-key: test-key
    33→          machine-size: s-1vcpu-1gb
    34→          image: ubuntu-22-04-x64
    35→          region: nyc1
    36→          project: test-project
    37→        machines:
    38→          web-server:
    39→            new-user-name: admin
    40→          db-server:
    41→            new-user-name: dbadmin
    42→        """
    43→
    44→
    45→class TestInvalidMachineType:
    46→    """Issue #29: invalid machine type should produce a graceful error, not a traceback."""
    47→
    48→    @pytest.fixture()
    49→    def config_dir(self, tmp_path):
    50→        return tmp_path
    51→
    52→    def test_invalid_machine_type_error_message(self, config_dir):
    53→        config_file = config_dir / "config.yml"
    54→        write_config(config_file, valid_config())
    55→        result = run_machine(
    56→            "--config-file",
    57→            str(config_file),
    58→            "create",
    59→            "-n",
    60→            "test-1",
    61→            "-m",
    62→            "nonexistent-type",
    63→            "--no-update-dns",
    64→        )
    65→        assert result.returncode != 0
    66→        assert "nonexistent-type" in result.stderr
    67→        assert "not found" in result.stderr.lower()
    68→        # Should list available types
    69→        assert "web-server" in result.stderr
    70→        assert "db-server" in result.stderr
    71→        # Should NOT be a traceback
    72→        assert "Traceback" not in result.stderr
    73→        assert "KeyError" not in result.stderr
    74→
    75→    def test_valid_machine_type_accepted(self, config_dir):
    76→        """Verify that a valid machine type doesn't produce an error about the type.
    77→        (It will fail later trying to reach DigitalOcean, but not with a type error.)"""
    78→        config_file = config_dir / "config.yml"
    79→        write_config(config_file, valid_config())
    80→        result = run_machine(
    81→            "--config-file",
    82→            str(config_file),
    83→            "create",
    84→            "-n",
    85→            "test-1",
    86→            "-m",
    87→            "web-server",
    88→            "--no-update-dns",
    89→        )
    90→        # It will fail (no real DO token), but NOT because of machine type
    91→        assert "not found in config" not in result.stderr
    92→
    93→
    94→class TestMissingConfigSections:
    95→    """Missing required config sections/keys should produce graceful errors."""
    96→
    97→    @pytest.fixture()
    98→    def config_dir(self, tmp_path):
    99→        return tmp_path
   100→
   101→    def test_missing_digital_ocean_section(self, config_dir):
   102→        config_file = config_dir / "config.yml"
   103→        write_config(
   104→            config_file,
   105→            """\
   106→            machines:
   107→              web-server:
   108→                new-user-name: admin
   109→            """,
   110→        )
   111→        result = run_machine("--config-file", str(config_file), "types")
   112→        assert result.returncode != 0
   113→        assert "digital-ocean" in result.stderr
   114→        assert "Traceback" not in result.stderr
   115→
   116→    def test_missing_access_token(self, config_dir):
   117→        config_file = config_dir / "config.yml"
   118→        write_config(
   119→            config_file,
   120→            """\
   121→            digital-ocean:
   122→              ssh-key: test-key
   123→              machine-size: s-1vcpu-1gb
   124→              image: ubuntu-22-04-x64
   125→              region: nyc1
   126→              project: test-project
   127→            machines:
   128→              web-server:
   129→                new-user-name: admin
   130→            """,
   131→        )
   132→        result = run_machine("--config-file", str(config_file), "types")
   133→        assert result.returncode != 0
   134→        assert "access-token" in result.stderr
   135→        assert "Traceback" not in result.stderr
   136→
   137→    def test_missing_ssh_key(self, config_dir):
   138→        config_file = config_dir / "config.yml"
   139→        write_config(
   140→            config_file,
   141→            """\
   142→            digital-ocean:
   143→              access-token: fake-token
   144→              machine-size: s-1vcpu-1gb
   145→              image: ubuntu-22-04-x64
   146→              region: nyc1
   147→              project: test-project
   148→            machines:
   149→              web-server:
   150→                new-user-name: admin
   151→            """,
   152→        )
   153→        result = run_machine("--config-file", str(config_file), "types")
   154→        assert result.returncode != 0
   155→        assert "ssh-key" in result.stderr
   156→        assert "Traceback" not in result.stderr
   157→
   158→    def test_missing_machines_section(self, config_dir):
   159→        config_file = config_dir / "config.yml"
   160→        write_config(
   161→            config_file,
   162→            """\
   163→            digital-ocean:
   164→              access-token: fake-token
   165→              ssh-key: test-key
   166→              machine-size: s-1vcpu-1gb
   167→              image: ubuntu-22-04-x64
   168→              region: nyc1
   169→              project: test-project
   170→            """,
   171→        )
   172→        result = run_machine("--config-file", str(config_file), "types")
   173→        assert result.returncode != 0
   174→        assert "machines" in result.stderr.lower()
   175→        assert "Traceback" not in result.stderr
   176→
   177→    def test_missing_new_user_name_in_machine(self, config_dir):
   178→        config_file = config_dir / "config.yml"
   179→        write_config(
   180→            config_file,
   181→            """\
   182→            digital-ocean:
   183→              access-token: fake-token
   184→              ssh-key: test-key
   185→              machine-size: s-1vcpu-1gb
   186→              image: ubuntu-22-04-x64
   187→              region: nyc1
   188→              project: test-project
   189→            machines:
   190→              broken-machine:
   191→                script-url: http://example.com/setup.sh
   192→            """,
   193→        )
   194→        result = run_machine(
   195→            "--config-file",
   196→            str(config_file),
   197→            "create",
   198→            "-n",
   199→            "test-1",
   200→            "-m",
   201→            "broken-machine",
   202→            "--no-update-dns",
   203→        )
   204→        assert result.returncode != 0
   205→        assert "new-user-name" in result.stderr
   206→        assert "Traceback" not in result.stderr
   207→
   208→
   209→class TestCreateNoInitialize:
   210→    """--no-initialize should work without a machine type and without crashing."""
   211→
   212→    @pytest.fixture()
   213→    def config_dir(self, tmp_path):
   214→        return tmp_path
   215→
   216→    def test_no_initialize_without_type_no_crash(self, config_dir):
   217→        """Using --no-initialize without --type should not crash with AttributeError or NameError."""
   218→        config_file = config_dir / "config.yml"
   219→        write_config(config_file, valid_config())
   220→        result = run_machine(
   221→            "--config-file",
   222→            str(config_file),
   223→            "create",
   224→            "-n",
   225→            "test-1",
   226→            "--no-initialize",
   227→            "--no-update-dns",
   228→        )
   229→        # It will fail (no real DO token), but should NOT crash with AttributeError/NameError
   230→        # from the bugs where user_data was undefined and type.lower() was called on None
   231→        assert "AttributeError" not in result.stderr
   232→        assert "NameError" not in result.stderr
   233→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/tests/test_cli_integration.py"
}
     1→"""Integration tests that run the machine CLI as a subprocess, the same way a user would."""
     2→
     3→import os
     4→import subprocess
     5→import textwrap
     6→import pytest
     7→
     8→
     9→def run_machine(*args, env_override=None):
    10→    """Run the machine CLI as a subprocess and return the result."""
    11→    env = os.environ.copy()
    12→    if env_override:
    13→        env.update(env_override)
    14→    result = subprocess.run(
    15→        ["uv", "run", "machine"] + list(args),
    16→        capture_output=True,
    17→        text=True,
    18→        env=env,
    19→    )
    20→    return result
    21→
    22→
    23→def write_config(path, content):
    24→    with open(path, "w") as f:
    25→        f.write(textwrap.dedent(content))
    26→
    27→
    28→class TestVersionCommand:
    29→    """Smoke test: the CLI runs and the version command works without any config."""
    30→
    31→    def test_version_runs(self):
    32→        result = run_machine("version")
    33→        assert result.returncode == 0
    34→        assert result.stdout.strip()  # prints something
    35→
    36→    def test_help_runs(self):
    37→        result = run_machine("--help")
    38→        assert result.returncode == 0
    39→        assert "Usage" in result.stdout
    40→
    41→
    42→class TestEnvVarExpansionIntegration:
    43→    """End-to-end tests that verify environment variable expansion works
    44→    when the actual machine tool is invoked with a config file."""
    45→
    46→    @pytest.fixture()
    47→    def config_dir(self, tmp_path):
    48→        return tmp_path
    49→
    50→    def test_env_var_expanded_in_config(self, config_dir):
    51→        config_file = config_dir / "config.yml"
    52→        write_config(
    53→            config_file,
    54→            """\
    55→            digital-ocean:
    56→              access-token: "${TEST_DO_TOKEN}"
    57→              ssh-key: test-key
    58→              machine-size: s-1vcpu-1gb
    59→              image: ubuntu-22-04-x64
    60→              region: nyc1
    61→              project: test-project
    62→            machines:
    63→              test-machine:
    64→                new-user-name: testuser
    65→            """,
    66→        )
    67→        result = run_machine(
    68→            "--debug",
    69→            "--config-file",
    70→            str(config_file),
    71→            "types",
    72→            env_override={"TEST_DO_TOKEN": "tok_secret_12345"},
    73→        )
    74→        assert result.returncode == 0
    75→        # The debug output on stderr should contain the expanded token value
    76→        assert "tok_secret_12345" in result.stderr
    77→        # And should NOT contain the unexpanded variable reference
    78→        assert "${TEST_DO_TOKEN}" not in result.stderr
    79→        # The types command should list the machine name on stdout
    80→        assert "test-machine" in result.stdout
    81→
    82→    def test_multiple_env_vars_expanded(self, config_dir):
    83→        config_file = config_dir / "config.yml"
    84→        write_config(
    85→            config_file,
    86→            """\
    87→            digital-ocean:
    88→              access-token: "${TEST_TOKEN}"
    89→              ssh-key: "${TEST_SSH_KEY}"
    90→              machine-size: s-1vcpu-1gb
    91→              image: ubuntu-22-04-x64
    92→              region: nyc1
    93→              project: test-project
    94→            machines:
    95→              dev:
    96→                new-user-name: devuser
    97→            """,
    98→        )
    99→        result = run_machine(
   100→            "--debug",
   101→            "--config-file",
   102→            str(config_file),
   103→            "types",
   104→            env_override={"TEST_TOKEN": "expanded_token", "TEST_SSH_KEY": "expanded_key"},
   105→        )
   106→        assert result.returncode == 0
   107→        assert "expanded_token" in result.stderr
   108→        assert "expanded_key" in result.stderr
   109→
   110→    def test_env_var_with_default_uses_default_when_unset(self, config_dir):
   111→        config_file = config_dir / "config.yml"
   112→        write_config(
   113→            config_file,
   114→            """\
   115→            digital-ocean:
   116→              access-token: fake-token
   117→              ssh-key: test-key
   118→              machine-size: "${TEST_SIZE:-s-2vcpu-4gb}"
   119→              image: ubuntu-22-04-x64
   120→              region: nyc1
   121→              project: test-project
   122→            machines:
   123→              myvm:
   124→                new-user-name: admin
   125→            """,
   126→        )
   127→        # Make sure TEST_SIZE is not in the environment
   128→        clean_env = os.environ.copy()
   129→        clean_env.pop("TEST_SIZE", None)
   130→        result = run_machine(
   131→            "--debug",
   132→            "--config-file",
   133→            str(config_file),
   134→            "types",
   135→            env_override={},
   136→        )
   137→        # Run with TEST_SIZE explicitly removed
   138→        result = subprocess.run(
   139→            ["uv", "run", "machine", "--debug", "--config-file", str(config_file), "types"],
   140→            capture_output=True,
   141→            text=True,
   142→            env=clean_env,
   143→        )
   144→        assert result.returncode == 0
   145→        assert "s-2vcpu-4gb" in result.stderr
   146→
   147→    def test_env_var_with_default_uses_value_when_set(self, config_dir):
   148→        config_file = config_dir / "config.yml"
   149→        write_config(
   150→            config_file,
   151→            """\
   152→            digital-ocean:
   153→              access-token: fake-token
   154→              ssh-key: test-key
   155→              machine-size: "${TEST_SIZE:-s-2vcpu-4gb}"
   156→              image: ubuntu-22-04-x64
   157→              region: nyc1
   158→              project: test-project
   159→            machines:
   160→              myvm:
   161→                new-user-name: admin
   162→            """,
   163→        )
   164→        result = run_machine(
   165→            "--debug",
   166→            "--config-file",
   167→            str(config_file),
   168→            "types",
   169→            env_override={"TEST_SIZE": "s-4vcpu-8gb"},
   170→        )
   171→        assert result.returncode == 0
   172→        assert "s-4vcpu-8gb" in result.stderr
   173→        assert "s-2vcpu-4gb" not in result.stderr
   174→
   175→    def test_missing_env_var_without_default_exits_with_error(self, config_dir):
   176→        config_file = config_dir / "config.yml"
   177→        write_config(
   178→            config_file,
   179→            """\
   180→            digital-ocean:
   181→              access-token: "${DEFINITELY_NOT_SET_VAR}"
   182→              ssh-key: test-key
   183→              machine-size: s-1vcpu-1gb
   184→              image: ubuntu-22-04-x64
   185→              region: nyc1
   186→              project: test-project
   187→            machines:
   188→              myvm:
   189→                new-user-name: admin
   190→            """,
   191→        )
   192→        clean_env = os.environ.copy()
   193→        clean_env.pop("DEFINITELY_NOT_SET_VAR", None)
   194→        result = subprocess.run(
   195→            ["uv", "run", "machine", "--config-file", str(config_file), "types"],
   196→            capture_output=True,
   197→            text=True,
   198→            env=clean_env,
   199→        )
   200→        assert result.returncode != 0
   201→        assert "DEFINITELY_NOT_SET_VAR" in result.stderr
   202→
   203→    def test_env_var_in_machine_config_section(self, config_dir):
   204→        config_file = config_dir / "config.yml"
   205→        write_config(
   206→            config_file,
   207→            """\
   208→            digital-ocean:
   209→              access-token: fake-token
   210→              ssh-key: test-key
   211→              machine-size: s-1vcpu-1gb
   212→              image: ubuntu-22-04-x64
   213→              region: nyc1
   214→              project: test-project
   215→            machines:
   216→              web-server:
   217→                new-user-name: "${TEST_USERNAME}"
   218→              db-server:
   219→                new-user-name: admin
   220→            """,
   221→        )
   222→        result = run_machine(
   223→            "--debug",
   224→            "--config-file",
   225→            str(config_file),
   226→            "types",
   227→            env_override={"TEST_USERNAME": "deploy_user"},
   228→        )
   229→        assert result.returncode == 0
   230→        assert "deploy_user" in result.stderr
   231→        # Both machine types should be listed
   232→        assert "db-server" in result.stdout
   233→        assert "web-server" in result.stdout
   234→
Read
{
  "file_path": "/home/david/projects/stirlingbridge/machine/tests/test_e2e.py"
}
     1→"""End-to-end tests that create real VMs on DigitalOcean.
     2→
     3→These tests require a real DigitalOcean environment and are NOT run as part of
     4→the normal test suite. They must be invoked explicitly:
     5→
     6→    uv run pytest tests/test_e2e.py -v
     7→
     8→Required environment variables:
     9→    E2E_DO_TOKEN    - DigitalOcean API token
    10→    E2E_SSH_KEY     - Name of an SSH key already registered in DO
    11→    E2E_DNS_ZONE    - DNS zone managed by DO (e.g. "test.example.com")
    12→    E2E_PROJECT     - DO project name to assign droplets to
    13→
    14→Optional environment variables:
    15→    E2E_REGION      - Region slug (default: nyc1)
    16→    E2E_IMAGE       - Image slug (default: ubuntu-24-04-x64)
    17→    E2E_SIZE        - Machine size slug (default: s-1vcpu-512mb-10gb)
    18→"""
    19→
    20→import json
    21→import os
    22→import subprocess
    23→import uuid
    24→
    25→import pytest
    26→
    27→
    28→# ---------------------------------------------------------------------------
    29→# Skip the entire module if credentials are not provided
    30→# ---------------------------------------------------------------------------
    31→
    32→E2E_DO_TOKEN = os.environ.get("E2E_DO_TOKEN")
    33→E2E_SSH_KEY = os.environ.get("E2E_SSH_KEY")
    34→E2E_DNS_ZONE = os.environ.get("E2E_DNS_ZONE")
    35→E2E_REGION = os.environ.get("E2E_REGION", "nyc1")
    36→E2E_IMAGE = os.environ.get("E2E_IMAGE", "ubuntu-24-04-x64")
    37→E2E_SIZE = os.environ.get("E2E_SIZE", "s-1vcpu-512mb-10gb")
    38→E2E_PROJECT = os.environ.get("E2E_PROJECT")
    39→
    40→pytestmark = pytest.mark.e2e
    41→
    42→_MISSING = []
    43→if not E2E_DO_TOKEN:
    44→    _MISSING.append("E2E_DO_TOKEN")
    45→if not E2E_SSH_KEY:
    46→    _MISSING.append("E2E_SSH_KEY")
    47→if not E2E_DNS_ZONE:
    48→    _MISSING.append("E2E_DNS_ZONE")
    49→if not E2E_PROJECT:
    50→    _MISSING.append("E2E_PROJECT")
    51→
    52→if _MISSING:
    53→    pytestmark = [
    54→        pytest.mark.e2e,
    55→        pytest.mark.skip(reason=f"E2E env vars not set: {', '.join(_MISSING)}"),
    56→    ]
    57→
    58→
    59→# ---------------------------------------------------------------------------
    60→# Helpers
    61→# ---------------------------------------------------------------------------
    62→
    63→
    64→def _unique_name(prefix="e2etest"):
    65→    """Generate a short unique droplet name safe for DNS."""
    66→    return f"{prefix}-{uuid.uuid4().hex[:8]}"
    67→
    68→
    69→def _write_config(path, **overrides):
    70→    """Write a minimal config file for the e2e test run."""
    71→    cfg = {
    72→        "access-token": E2E_DO_TOKEN,
    73→        "ssh-key": E2E_SSH_KEY,
    74→        "dns-zone": E2E_DNS_ZONE,
    75→        "machine-size": E2E_SIZE,
    76→        "image": E2E_IMAGE,
    77→        "region": E2E_REGION,
    78→    }
    79→    cfg["project"] = E2E_PROJECT
    80→    cfg.update(overrides)
    81→
    82→    do_lines = "\n".join(f"  {k}: {v}" for k, v in cfg.items())
    83→    content = f"digital-ocean:\n{do_lines}\nmachines:\n  e2e-basic:\n    new-user-name: e2euser\n"
    84→    with open(path, "w") as f:
    85→        f.write(content)
    86→
    87→
    88→def run_machine(*args, config_file=None, session_id=None):
    89→    """Run the machine CLI as a subprocess with the given arguments."""
    90→    cmd = ["uv", "run", "machine"]
    91→    if config_file:
    92→        cmd += ["--config-file", str(config_file)]
    93→    if session_id:
    94→        cmd += ["--session-id", session_id]
    95→    cmd += list(args)
    96→    result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
    97→    return result
    98→
    99→
   100→def _extract_droplet_id(output_text):
   101→    """Extract the droplet ID from CLI output like 'New droplet created with id: 12345'."""
   102→    for line in output_text.splitlines():
   103→        if "id:" in line.lower():
   104→            parts = line.split("id:")
   105→            if len(parts) >= 2:
   106→                candidate = parts[-1].strip()
   107→                if candidate.isdigit():
   108→                    return candidate
   109→    return None
   110→
   111→
   112→# ---------------------------------------------------------------------------
   113→# Fixtures
   114→# ---------------------------------------------------------------------------
   115→
   116→
   117→@pytest.fixture(scope="module")
   118→def config_file(tmp_path_factory):
   119→    """Write a config file that lives for the whole test module."""
   120→    path = tmp_path_factory.mktemp("e2e") / "config.yml"
   121→    _write_config(path)
   122→    return path
   123→
   124→
   125→@pytest.fixture(scope="module")
   126→def session_id():
   127→    """A unique session id shared across all tests in this module."""
   128→    return uuid.uuid4().hex[:8]
   129→
   130→
   131→@pytest.fixture(scope="class")
   132→def droplet(config_file, session_id):
   133→    """Create a single droplet with all features and destroy it after all tests.
   134→
   135→    The droplet is created with DNS, a machine type (cloud-init), a custom tag,
   136→    and --wait-for-ip so that all aspects can be verified by individual tests.
   137→    """
   138→    name = _unique_name()
   139→    custom_tag = f"e2e-tag-{uuid.uuid4().hex[:6]}"
   140→
   141→    # ---- CREATE with all features ------------------------------------------
   142→    result = run_machine(
   143→        "create",
   144→        "--name",
   145→        name,
   146→        "--type",
   147→        "e2e-basic",
   148→        "--update-dns",
   149→        "--tag",
   150→        custom_tag,
   151→        "--wait-for-ip",
   152→        config_file=config_file,
   153→        session_id=session_id,
   154→    )
   155→    assert result.returncode == 0, f"create failed: {result.stderr}"
   156→    create_out = result.stdout + result.stderr
   157→    droplet_id = _extract_droplet_id(create_out)
   158→    assert droplet_id, f"Could not find droplet id in output:\n{create_out}"
   159→
   160→    info = {
   161→        "name": name,
   162→        "id": droplet_id,
   163→        "custom_tag": custom_tag,
   164→        "create_out": create_out,
   165→    }
   166→
   167→    yield info
   168→
   169→    # ---- TEARDOWN: destroy with DNS cleanup --------------------------------
   170→    run_machine(
   171→        "--verbose",
   172→        "destroy",
   173→        "--no-confirm",
   174→        "--delete-dns",
   175→        droplet_id,
   176→        config_file=config_file,
   177→        session_id=session_id,
   178→    )
   179→
   180→
   181→# ---------------------------------------------------------------------------
   182→# Tests — one droplet, many assertions
   183→# ---------------------------------------------------------------------------
   184→
   185→
   186→class TestDropletLifecycle:
   187→    """Create one droplet with all features and verify each aspect independently.
   188→
   189→    A single droplet is created (via the class-scoped ``droplet`` fixture) with
   190→    DNS, a machine type, and a custom tag.  Each test method verifies a different
   191→    aspect so that failures are reported individually.  The droplet is destroyed
   192→    automatically after all tests complete.
   193→    """
   194→
   195→    def test_droplet_appears_in_list(self, droplet, config_file, session_id):
   196→        """Verify the droplet shows up in ``list`` with the correct name."""
   197→        result = run_machine(
   198→            "list",
   199→            "--output",
   200→            "json",
   201→            config_file=config_file,
   202→            session_id=session_id,
   203→        )
   204→        assert result.returncode == 0, f"list failed: {result.stderr}"
   205→        droplets = json.loads(result.stdout)
   206→        matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
   207→        assert len(matched) == 1, f"Expected 1 droplet with id {droplet['id']}, got {len(matched)}"
   208→        assert matched[0]["name"] == droplet["name"]
   209→
   210→    def test_droplet_has_ip(self, droplet, config_file, session_id):
   211→        """Verify the droplet was assigned an IP address."""
   212→        result = run_machine(
   213→            "list",
   214→            "--output",
   215→            "json",
   216→            config_file=config_file,
   217→            session_id=session_id,
   218→        )
   219→        assert result.returncode == 0
   220→        droplets = json.loads(result.stdout)
   221→        matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
   222→        assert len(matched) == 1
   223→        assert matched[0]["ip"] is not None, "Droplet has no IP address"
   224→
   225→    def test_dns_record_created(self, droplet, config_file, session_id):
   226→        """Verify that a DNS A record was created for the droplet."""
   227→        result = run_machine(
   228→            "list-domain",
   229→            "--name",
   230→            droplet["name"],
   231→            "--output",
   232→            "json",
   233→            E2E_DNS_ZONE,
   234→            config_file=config_file,
   235→            session_id=session_id,
   236→        )
   237→        assert result.returncode == 0, f"list-domain failed: {result.stderr}"
   238→        records = json.loads(result.stdout)
   239→        a_records = [r for r in records if r.get("name") == droplet["name"] and r.get("type") == "A"]
   240→        assert len(a_records) >= 1, f"No A record found for {droplet['name']}.{E2E_DNS_ZONE}"
   241→
   242→    def test_dns_zone_in_create_output(self, droplet):
   243→        """Verify that DNS zone was mentioned in the create output."""
   244→        assert E2E_DNS_ZONE in droplet["create_out"], f"DNS zone not mentioned in output:\n{droplet['create_out']}"
   245→
   246→    def test_type_tag_applied(self, droplet, config_file, session_id):
   247→        """Verify that the machine type tag was applied and is filterable."""
   248→        result = run_machine(
   249→            "list",
   250→            "--type",
   251→            "e2e-basic",
   252→            "--output",
   253→            "json",
   254→            config_file=config_file,
   255→            session_id=session_id,
   256→        )
   257→        assert result.returncode == 0
   258→        droplets = json.loads(result.stdout)
   259→        matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
   260→        assert len(matched) == 1, "Droplet not found when filtering by type e2e-basic"
   261→        assert matched[0]["type"] == "e2e-basic", "Type tag mismatch"
   262→
   263→    def test_custom_tag_applied(self, droplet, config_file, session_id):
   264→        """Verify that the custom tag was applied and is filterable."""
   265→        result = run_machine(
   266→            "list",
   267→            "--tag",
   268→            droplet["custom_tag"],
   269→            "--output",
   270→            "json",
   271→            config_file=config_file,
   272→            session_id=session_id,
   273→        )
   274→        assert result.returncode == 0
   275→        droplets = json.loads(result.stdout)
   276→        matched = [d for d in droplets if str(d["id"]) == droplet["id"]]
   277→        assert len(matched) == 1, f"Droplet not found with tag {droplet['custom_tag']}"
   278→
Thinking

Now I have a complete picture of the codebase. I'll implement the provider abstraction layer, refactor the existing DigitalOcean code into a provider implementation, and update all subcommands. Let me start.

📝 Write provider.py
/home/david/projects/stirlingbridge/machine/machine/provider.py
from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class VM:
    id: str
    name: str
    tags: list
    region: str
    ip_address: str
    status: str


@dataclass
class SSHKey:
    id: str
    name: str
    fingerprint: str
    public_key: str


@dataclass
class DNSRecord:
    id: str
    name: str
    type: str
    data: str
    ttl: int


class CloudProvider(ABC):
    @abstractmethod
    def create_vm(self, name, region, image, size, ssh_key_name, tags, user_data) -> VM:
        pass

    @abstractmethod
    def get_vm(self, vm_id) -> VM:
        pass

    @abstractmethod
    def destroy_vm(self, vm_id) -> bool:
        pass

    @abstractmethod
    def list_vms(self, tag=None) -> list:
        pass

    @abstractmethod
    def get_ssh_key(self, name) -> SSHKey:
        pass

    @abstractmethod
    def list_ssh_keys(self) -> list:
        pass

    @abstractmethod
    def create_dns_record(self, zone, record_type, name, data, ttl, tag=None) -> str:
        pass

    @abstractmethod
    def delete_dns_record(self, zone, record_name) -> bool:
        pass

    @abstractmethod
    def get_dns_records(self, zone) -> list:
        pass

    @abstractmethod
    def list_domains(self) -> list:
        pass

    def list_projects(self) -> list:
        return []

    def assign_to_project(self, project_name, vm_id):
        pass

    def validate_region(self, region):
        pass

    def validate_image(self, image):
        pass

    @property
    def provider_name(self) -> str:
        return self.__class__.__name__
File created successfully at: /home/david/projects/stirlingbridge/machine/machine/provider.py
$ Bash
Create providers directory
mkdir -p /home/david/projects/stirlingbridge/machine/machine/providers
(Bash completed with no output)
📝 Write __init__.py
/home/david/projects/stirlingbridge/machine/machine/providers/__init__.py
from machine.log import fatal_error
from machine.provider import CloudProvider

KNOWN_PROVIDERS = ["digital-ocean"]


def create_provider(provider_name, provider_config) -> CloudProvider:
    if provider_name == "digital-ocean":
        from machine.providers.digitalocean import DigitalOceanProvider

        return DigitalOceanProvider(provider_config)
    else:
        fatal_error(f"Unknown provider: '{provider_name}'. Known providers: {', '.join(KNOWN_PROVIDERS)}")
File created successfully at: /home/david/projects/stirlingbridge/machine/machine/providers/__init__.py
📝 Write digitalocean.py
/home/david/projects/stirlingbridge/machine/machine/providers/digitalocean.py
import digitalocean

from machine.log import fatal_error, info
from machine.provider import CloudProvider, VM, SSHKey, DNSRecord


VALID_REGIONS = ["NYC1", "NYC3", "AMS3", "SFO2", "SFO3", "SGP1", "LON1", "FRA1", "TOR1", "BLR1", "SYD1"]

VALID_IMAGES = [
    "almalinux-8-x64",
    "almalinux-9-x64",
    "centos-stream-9-x64",
    "debian-11-x64",
    "debian-12-x64",
    "fedora-39-x64",
    "fedora-40-x64",
    "rockylinux-9-x64",
    "rockylinux-8-x64",
    "ubuntu-20-04-x64",
    "ubuntu-22-04-x64",
    "ubuntu-24-04-x64",
]


def _droplet_to_vm(droplet) -> VM:
    return VM(
        id=str(droplet.id),
        name=droplet.name,
        tags=droplet.tags,
        region=droplet.region["slug"] if droplet.region else None,
        ip_address=droplet.ip_address,
        status=droplet.status,
    )


class DigitalOceanProvider(CloudProvider):
    def __init__(self, provider_config):
        self.token = provider_config["access-token"]
        self._manager = digitalocean.Manager(token=self.token)

    def create_vm(self, name, region, image, size, ssh_key_name, tags, user_data) -> VM:
        ssh_key = self._get_do_ssh_key(ssh_key_name)
        if not ssh_key:
            fatal_error(f"Error: SSH key '{ssh_key_name}' not found in DigitalOcean")

        droplet = digitalocean.Droplet(
            token=self.token,
            name=name,
            region=region,
            image=image,
            size_slug=size,
            ssh_keys=[ssh_key],
            tags=tags,
            user_data=user_data,
            backups=False,
        )
        droplet.create()
        return _droplet_to_vm(droplet)

    def get_vm(self, vm_id) -> VM:
        droplet = self._manager.get_droplet(vm_id)
        return _droplet_to_vm(droplet)

    def destroy_vm(self, vm_id) -> bool:
        try:
            droplet = self._manager.get_droplet(vm_id)
        except digitalocean.NotFoundError:
            fatal_error(f"Error: machine with id {vm_id} not found")
        result = droplet.destroy()
        return result

    def list_vms(self, tag=None) -> list:
        if tag:
            droplets = self._manager.get_all_droplets(tag_name=tag)
        else:
            droplets = self._manager.get_all_droplets()
        return [_droplet_to_vm(d) for d in droplets]

    def get_ssh_key(self, name) -> SSHKey:
        do_key = self._get_do_ssh_key(name)
        if not do_key:
            return None
        return SSHKey(
            id=str(do_key.id),
            name=do_key.name,
            fingerprint=do_key.fingerprint,
            public_key=do_key.public_key,
        )

    def list_ssh_keys(self) -> list:
        keys = self._manager.get_all_sshkeys()
        return [
            SSHKey(id=str(k.id), name=k.name, fingerprint=k.fingerprint, public_key=k.public_key)
            for k in keys
        ]

    def create_dns_record(self, zone, record_type, name, data, ttl, tag=None) -> str:
        domain = digitalocean.Domain(token=self.token, name=zone)
        try:
            record = domain.create_new_domain_record(type=record_type, ttl=ttl, name=name, data=data, tag=tag)
        except digitalocean.NotFoundError:
            info(f"Warning: DNS zone '{zone}' not found in DigitalOcean, DNS record not set")
            return None
        return record

    def delete_dns_record(self, zone, record_name) -> bool:
        domain = digitalocean.Domain(token=self.token, name=zone)
        records = domain.get_records()
        for record in records:
            if record.name == record_name:
                domain.delete_domain_record(id=record.id)
                return True
        return False

    def get_dns_records(self, zone) -> list:
        domain = digitalocean.Domain(token=self.token, name=zone)
        records = domain.get_records()
        return [
            DNSRecord(id=str(r.id), name=r.name, type=r.type, data=r.data, ttl=r.ttl)
            for r in records
        ]

    def list_domains(self) -> list:
        domains = self._manager.get_all_domains()
        return [d.name for d in domains]

    def list_projects(self) -> list:
        projects = self._manager.get_all_projects()
        return [p.name for p in projects]

    def assign_to_project(self, project_name, vm_id):
        projects = self._manager.get_all_projects()
        project = None
        for p in projects:
            if p.name == project_name:
                project = p
                break
        if not project:
            fatal_error(f"Error: Project {project_name} does not exist, machine created but not assigned to project")
        project.assign_resource([f"do:droplet:{vm_id}"])

    def validate_region(self, region):
        if region is not None and region.upper() not in VALID_REGIONS:
            fatal_error(f"Error: region {region} is not one of {VALID_REGIONS}")

    def validate_image(self, image):
        if image is not None and image not in VALID_IMAGES:
            info(f"Warning: image {image} is not one of these known valid images: {VALID_IMAGES}")

    def _get_do_ssh_key(self, name):
        keys = self._manager.get_all_sshkeys()
        for key in keys:
            if key.name == name:
                return key
        return None

    @property
    def provider_name(self) -> str:
        return "DigitalOcean"
File created successfully at: /home/david/projects/stirlingbridge/machine/machine/providers/digitalocean.py

Now update the core types, config, main, util, and cloud_config:

📝 Write types.py
/home/david/projects/stirlingbridge/machine/machine/types.py
from dataclasses import dataclass

TAG_MACHINE_CREATED = "machine:created"
TAG_MACHINE_TYPE_PREFIX = "machine:type:"
TAG_MACHINE_SESSION_PREFIX = "machine:session:"


@dataclass
class CliOptions:
    debug: bool
    quiet: bool
    verbose: bool
    dry_run: bool


@dataclass
class Config:
    provider_name: str
    provider_config: dict
    ssh_key: str
    dns_zone: str
    machine_size: str
    image: str
    region: str
    project: str


@dataclass
class MainCmdCtx:
    config: Config
    session_id: str
    provider: object  # CloudProvider instance


@dataclass
class MachineConfig:
    new_user_name: str
    script_url: str
    script_dir: str
    script_path: str
    script_args: str
The file /home/david/projects/stirlingbridge/machine/machine/types.py has been updated successfully.
📝 Write config.py
/home/david/projects/stirlingbridge/machine/machine/config.py
import os
import re
from pathlib import Path
from machine.di import d
from machine.factory import yaml
from machine.log import fatal_error, debug
from machine.types import Config, MachineConfig
from machine import constants
from machine.providers import KNOWN_PROVIDERS

_env_var_pattern = re.compile(r"\$\{([^}]+)\}")


def _expand_env_vars(value):
    if isinstance(value, str):

        def _replace(match):
            expr = match.group(1)
            if ":-" in expr:
                var_name, default = expr.split(":-", 1)
                return os.environ.get(var_name, default)
            else:
                if expr not in os.environ:
                    fatal_error(f"Environment variable '{expr}' referenced in config is not set")
                return os.environ[expr]

        return _env_var_pattern.sub(_replace, value)
    elif isinstance(value, dict):
        return {k: _expand_env_vars(v) for k, v in value.items()}
    elif isinstance(value, list):
        return [_expand_env_vars(item) for item in value]
    return value


class _loaded_config:
    c: any = None


def _load_config_data(config_file_name: str):
    if not config_file_name:
        config_file_name = constants.default_config_file_path
    config_path = Path(os.path.expanduser(config_file_name))
    if not config_path.exists():
        fatal_error(f"Error: Config file: {config_path} not found")
    config = _expand_env_vars(yaml().load(open(config_path, "r")))
    if d.opt.debug:
        debug(f"Loaded config file: {config_path}")
        debug(f"Parsed config: {config}")
    _loaded_config.c = config
    return config


def _require_key(d, key, section_name):
    if key not in d:
        fatal_error(f"Required key '{key}' not found in '{section_name}' section of config file")
    return d[key]


def get(config_file_name: str) -> Config:
    config = _load_config_data(config_file_name)

    # Auto-detect provider from config sections
    provider_name = config.get("provider")
    if not provider_name:
        found = [p for p in KNOWN_PROVIDERS if p in config]
        if len(found) == 0:
            fatal_error(
                "No provider section found in config file. Expected one of: " + ", ".join(KNOWN_PROVIDERS)
            )
        if len(found) > 1:
            fatal_error(
                "Multiple provider sections found in config file. Please add a 'provider:' key to select one."
            )
        provider_name = found[0]

    if provider_name not in config:
        fatal_error(f"Provider '{provider_name}' specified but no '{provider_name}' section found in config file")

    provider_config = config[provider_name]
    return Config(
        provider_name=provider_name,
        provider_config=provider_config,
        ssh_key=_require_key(provider_config, "ssh-key", provider_name),
        dns_zone=provider_config.get("dns-zone"),
        machine_size=_require_key(provider_config, "machine-size", provider_name),
        image=_require_key(provider_config, "image", provider_name),
        region=_require_key(provider_config, "region", provider_name),
        project=provider_config.get("project"),
    )


def get_machine(name: str) -> MachineConfig:
    if not _loaded_config.c:
        fatal_error("Attempt to fetch machine data before config loaded")
    config = _loaded_config.c
    if "machines" not in config:
        fatal_error("Required 'machines' section not found in config file")
    config_machines = config["machines"]
    if name not in config_machines:
        fatal_error(f"Machine type '{name}' not found in config file. Available types: {', '.join(config_machines.keys())}")
    target_config = config_machines[name]
    return MachineConfig(
        _require_key(target_config, "new-user-name", f"machines.{name}"),
        target_config.get("script-url"),
        target_config.get("script-dir"),
        target_config.get("script-path"),
        target_config.get("script-args"),
    )


def get_machines():
    if not _loaded_config.c:
        fatal_error("Attempt to fetch machine data before config loaded")
    config = _loaded_config.c

    if "machines" not in config:
        fatal_error("Required 'machines' section not found in config file")
    ret = {}
    for name in config["machines"]:
        ret[name] = get_machine(name)
    return ret
The file /home/david/projects/stirlingbridge/machine/machine/config.py has been updated successfully.
📝 Write main.py
/home/david/projects/stirlingbridge/machine/machine/main.py
import os

import click

from machine import config
from machine import constants
from machine.di import d
from machine.log import output
from machine.providers import create_provider
from machine.subcommands import create, destroy, list, projects, ssh_keys, domains, list_domain, types, status
from machine.types import CliOptions, MainCmdCtx
from machine.util import load_session_id

CLICK_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])


@click.group(context_settings=CLICK_CONTEXT_SETTINGS)
@click.option("--debug", is_flag=True, default=False, help="Enable debug output")
@click.option("--quiet", is_flag=True, default=False, help="Suppress all non-essential output")
@click.option("--verbose", is_flag=True, default=False, help="Enable verbose output")
@click.option("--dry-run", is_flag=True, default=False, help="Run but do not do anything")
@click.option("--config-file", metavar="<PATH>", help=f"Specify the config file (default {constants.default_config_file_path})")
@click.option(
    "--session-id", metavar="<ID>", default=load_session_id(), help=f"Override the default session ID (default {load_session_id()})"
)
@click.pass_context
def main(context, debug, quiet, verbose, dry_run, config_file, session_id):
    options = CliOptions(debug, quiet, verbose, dry_run)
    d.opt = options
    # Skip config loading for version subcommand since it doesn't need it
    # and should work even when no config file exists (#25)
    if context.invoked_subcommand == "version":
        return
    cfg = config.get(config_file)
    provider = create_provider(cfg.provider_name, cfg.provider_config)
    main_context = MainCmdCtx(cfg, session_id, provider)
    context.obj = main_context


@main.command()
@click.pass_context
def version(context):
    try:
        version_file = os.path.join(os.path.dirname(__file__), "version.txt")
        with open(version_file) as f:
            version_string = f.read().strip()
    except FileNotFoundError:
        version_string = "dev"
    output(version_string)


main.add_command(create.command, "create")
main.add_command(destroy.command, "destroy")
main.add_command(domains.command, "domains")
main.add_command(list.command, "list")
main.add_command(list_domain.command, "list-domain")
main.add_command(projects.command, "projects")
main.add_command(ssh_keys.command, "ssh-keys")
main.add_command(types.command, "types")
main.add_command(status.command, "status")
The file /home/david/projects/stirlingbridge/machine/machine/main.py has been updated successfully.
📝 Write util.py
/home/david/projects/stirlingbridge/machine/machine/util.py
import os
import uuid

from machine.factory import yaml
from machine.constants import default_config_dir_path, default_session_id_file_path
from machine.types import TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_CREATED

from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX


def get_machine_type(vm):
    type = next((t for t in vm.tags if TAG_MACHINE_TYPE_PREFIX in t), "").replace(TAG_MACHINE_TYPE_PREFIX, "")
    if not type:
        return None
    return type


def is_machine_created(vm):
    return TAG_MACHINE_CREATED in vm.tags


def is_same_session(command_context: MainCmdCtx, vm):
    return TAG_MACHINE_SESSION_PREFIX + command_context.session_id in vm.tags


def load_session_id():
    if not os.path.exists(default_config_dir_path):
        os.mkdir(default_config_dir_path)

    if not os.path.exists(default_session_id_file_path):
        with open(default_session_id_file_path, "w") as f:
            f.write("id: " + str(uuid.uuid4()).replace("-", "")[0:8])

    sessionid_config = yaml().load(open(default_session_id_file_path, "r"))
    return sessionid_config["id"]


def vm_to_json_obj(vm):
    return {
        "id": vm.id,
        "name": vm.name,
        "tags": vm.tags,
        "region": vm.region,
        "ip": vm.ip_address,
        "type": get_machine_type(vm),
    }


def dns_record_to_json_obj(dns_record, zone, vm):
    if vm:
        vm = vm_to_json_obj(vm)

    return {
        "id": dns_record.id,
        "droplet": vm,
        "name": dns_record.name,
        "fqdn": f"{dns_record.name}.{zone}",
        "zone": zone,
        "data": dns_record.data,
        "ttl": dns_record.ttl,
        "type": dns_record.type,
    }
The file /home/david/projects/stirlingbridge/machine/machine/util.py has been updated successfully.
📝 Write cloud_config.py
/home/david/projects/stirlingbridge/machine/machine/cloud_config.py
from expandvars import expand

from machine.log import fatal_error
from machine.provider import CloudProvider
from machine.types import MachineConfig


def get_user_data(provider: CloudProvider, ssh_key_name: str, fqdn: str, machine_config: MachineConfig):
    if not fqdn:
        fqdn = ""

    script_args = machine_config.script_args
    if not script_args:
        script_args = ""

    ssh_key = provider.get_ssh_key(ssh_key_name)
    if not ssh_key:
        fatal_error(f"Error: SSH key '{ssh_key_name}' not found in {provider.provider_name}")
    ssh_public_key = ssh_key.public_key
    escaped_args = script_args.replace('"', '\\"')

    cloud_env = {
        "MACHINE_SCRIPT_URL": machine_config.script_url,
        "MACHINE_SCRIPT_DIR": machine_config.script_dir,
        "MACHINE_FQDN": fqdn,
    }

    # Exand here because otherwise escaping the vars properly for nested scripts is a guessing game
    escaped_args = expand(escaped_args, environ=cloud_env)
    cloud_config = f"""#cloud-config
users:
  - name: {machine_config.new_user_name}
    groups: sudo
    shell: /bin/bash
    sudo: ['ALL=(ALL) NOPASSWD:ALL']
    ssh-authorized-keys:
      - {ssh_public_key}
"""
    if machine_config.script_url and machine_config.script_dir and machine_config.script_path:
        cloud_config += f"""
runcmd:
  - mkdir -p {machine_config.script_dir}
  - curl -L {machine_config.script_url} -o {machine_config.script_path}
  - chmod +x {machine_config.script_path}
  - [su, -c, "env {" ".join([f"{k}='{v}'" for k, v in cloud_env.items()])} {machine_config.script_path} {escaped_args}", -, {machine_config.new_user_name}]
"""
    return cloud_config
The file /home/david/projects/stirlingbridge/machine/machine/cloud_config.py has been updated successfully.

Now update all the subcommands:

📝 Write create.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/create.py
import click
import time
from machine.config import get_machine
from machine.di import d
from machine.log import fatal_error, info, debug, output
from machine.types import MainCmdCtx, TAG_MACHINE_CREATED, TAG_MACHINE_TYPE_PREFIX
from machine.cloud_config import get_user_data

from machine.types import TAG_MACHINE_SESSION_PREFIX


@click.command(help="Create a machine")
@click.option("--name", "-n", required=True, metavar="<MACHINE-NAME>", help="Name for new machine")
@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="tag to be applied to new machine")
@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="create a machine of this type")
@click.option("--region", "-r", metavar="<REGION-CODE>", help="create a machine in this region (overrides default from config)")
@click.option(
    "--machine-size", "-s", metavar="<MACHINE-SLUG>", help="create a machine of this size (overrides default from config)"
)
@click.option("--image", "-s", metavar="<IMAGE-NAME>", help="create a machine from this image (overrides default from config)")
@click.option("--wait-for-ip/--no-wait-for-up", default=False)
@click.option("--update-dns/--no-update-dns", default=True)
@click.option("--initialize/--no-initialize", default=True)
@click.pass_context
def command(context, name, tag, type, region, machine_size, image, wait_for_ip, update_dns, initialize):
    command_context: MainCmdCtx = context.obj
    config = command_context.config
    provider = command_context.provider

    if update_dns and not config.dns_zone:
        fatal_error("Error: DNS update requested but no zone configured")

    user_data = None
    if initialize:
        if not type:
            fatal_error("Error: a machine type must be supplied")
        machine_config = get_machine(type)
        if not machine_config:
            fatal_error(f"Error: machine type {type} is not defined")
        fqdn = f"{name}.{config.dns_zone}" if config.dns_zone else None
        user_data = get_user_data(provider, config.ssh_key, fqdn, machine_config)
        if d.opt.debug:
            info("user-data is:")
            info(user_data)

    # Verify SSH key exists
    ssh_key = provider.get_ssh_key(config.ssh_key)
    if not ssh_key:
        fatal_error(f"Error: SSH key '{config.ssh_key}' not found in {provider.provider_name}")

    provider.validate_region(region)
    provider.validate_image(image)

    tags = [
        TAG_MACHINE_SESSION_PREFIX + command_context.session_id,
        TAG_MACHINE_CREATED,
    ]
    if type:
        tags.append(TAG_MACHINE_TYPE_PREFIX + type.lower())
    if tag:
        tags.append(tag)

    vm = provider.create_vm(
        name=name,
        region=region if region is not None else config.region,
        image=image if image is not None else config.image,
        size=machine_size if machine_size is not None else config.machine_size,
        ssh_key_name=config.ssh_key,
        tags=tags,
        user_data=user_data,
    )

    if vm.id:
        if d.opt.quiet:
            output(f"{vm.id}")
        else:
            output(f"New droplet created with id: {vm.id}")

    # If requested, assign to a specified project
    if config.project:
        provider.assign_to_project(config.project, vm.id)
        if d.opt.verbose:
            info(f"Assigned droplet to project: {config.project}")

    # If requested, or if we are going to set a DNS record get the VM's IPv4 address
    ip_address = vm.ip_address
    if (wait_for_ip or update_dns) and not ip_address:
        while not ip_address:
            time.sleep(1)
            vm = provider.get_vm(vm.id)
            ip_address = vm.ip_address
            if d.opt.verbose:
                output("Waiting for droplet IP address")
        if d.opt.quiet:
            info(f"{ip_address}")
        else:
            info(f"IP Address: {ip_address}")

    # If requested, and we have the IP address, create a DNS host record
    if update_dns and ip_address and config.dns_zone:
        zone = config.dns_zone
        host = name
        if d.opt.debug:
            debug(f"Setting host record {host}.{zone} to {ip_address}")
        record = provider.create_dns_record(
            zone=zone,
            record_type="A",
            name=host,
            data=ip_address,
            ttl=60 * 5,
            tag=TAG_MACHINE_CREATED,
        )
        if record:
            if d.opt.verbose:
                info(f"Created DNS record:{record}")
            if not d.opt.quiet:
                info(f"DNS: {host}.{zone}")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/create.py has been updated successfully.
📝 Write destroy.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/destroy.py
import click

from machine.di import d
from machine.log import debug, fatal_error, output
from machine.util import is_machine_created
from machine.types import MainCmdCtx

from machine.util import is_same_session


@click.command(help="Destroy one or more machines")
@click.option("--confirm/--no-confirm", default=True)
@click.option("--delete-dns/--no-delete-dns", default=True)
@click.option(
    "--all",
    is_flag=True,
    default=False,
    help="Include machines not created by this tool",
)
@click.argument("droplet-ids", nargs=-1)
@click.pass_context
def command(context, confirm, delete_dns, all, droplet_ids):
    command_context: MainCmdCtx = context.obj
    config = command_context.config
    provider = command_context.provider

    for droplet_id in droplet_ids:
        vm = provider.get_vm(droplet_id)
        name = vm.name

        if not is_machine_created(vm) and not all:
            fatal_error(f'ERROR: Cannot destroy droplet "{name}" (id: {vm.id}), it was not created by machine.')

        if not is_same_session(command_context, vm) and not all:
            fatal_error(
                f'ERROR: Cannot destroy droplet "{name}" (id: {vm.id}), it was created by a different session of machine.'
            )

        if confirm:
            output(
                "Type YES (not y or yes or Yes) to confirm that you want to permanently"
                f' DELETE/DESTROY droplet "{name}" (id: {vm.id})'
            )
            confirmation = input()
            if confirmation != "YES":
                fatal_error("Destroy operation aborted, not confirmed by user")

        result = provider.destroy_vm(droplet_id)

        if result and delete_dns and config.dns_zone:
            zone = config.dns_zone
            if d.opt.debug:
                debug(f"Deleting host record {name}.{zone}")
            deleted = provider.delete_dns_record(zone, name)
            if deleted:
                if d.opt.debug:
                    debug(f"Deleted dns record for {name}.{zone}")
            else:
                if d.opt.debug:
                    debug(f"No dns record found for {name}.{zone}")

        if not result:
            fatal_error("Error destroying machine")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/destroy.py has been updated successfully.
📝 Write list.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/list.py
import click
import json

from machine.log import fatal_error, output
from machine.types import MainCmdCtx, TAG_MACHINE_TYPE_PREFIX, TAG_MACHINE_SESSION_PREFIX
from machine.util import get_machine_type, is_machine_created, is_same_session, vm_to_json_obj


def print_normal(vms):
    for vm in vms:
        region = vm.region if vm.region else "unknown"
        output(f"{vm.name} ({vm.id}, {region}, {get_machine_type(vm)}): {vm.ip_address}")


def print_quiet(vms):
    for vm in vms:
        output(vm.id)


def print_json(vms):
    output(json.dumps([vm_to_json_obj(v) for v in vms]))


def get_vms(command_context, id=None, name=None, tag=None, type=None, region=None, all=False):
    provider = command_context.provider

    vms = []
    if id:
        vm = provider.get_vm(id)
        if vm:
            vms.append(vm)

    if all:
        vms = provider.list_vms()
    else:
        vms = provider.list_vms(tag=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)

    # we can't combine most filters over the API, so we also filter ourselves
    if name:
        vms = filter(lambda v: v.name == name, vms)

    if tag:
        vms = filter(lambda v: tag in v.tags, vms)

    if type:
        vms = filter(lambda v: TAG_MACHINE_TYPE_PREFIX + type.lower() in v.tags, vms)

    if region:
        vms = filter(lambda v: v.region and region == v.region, vms)

    if not all:
        vms = filter(lambda v: is_machine_created(v) and is_same_session(command_context, v), vms)

    return list(vms)


@click.command(help="List machines")
@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
@click.option(
    "--all",
    is_flag=True,
    default=False,
    help="All machines, including those not created by this tool or by other sessions",
)
@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
@click.option(
    "--unique",
    is_flag=True,
    default=False,
    help="Return an error if there is more than one match",
)
@click.pass_context
def command(context, id, name, tag, type, region, all, output, quiet, unique):
    command_context: MainCmdCtx = context.obj

    vms = get_vms(command_context, id, name, tag, type, region, all)
    if unique and len(vms) > 1:
        fatal_error(f"ERROR: --unique match required but {len(vms)} matches found.")

    if output == "json":
        print_json(vms)
    elif quiet:
        print_quiet(vms)
    else:
        print_normal(vms)
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/list.py has been updated successfully.
📝 Write status.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/status.py
import click
import json
import requests

from machine.log import output
from machine.subcommands.list import get_vms
from machine.types import MainCmdCtx


def print_normal(statuses):
    for status in statuses:
        output(
            f"{status['name']} ({status['id']}):\t" + "\t".join([f"{k}={v}" for k, v in status.items() if k not in ["name", "id"]])
        )


def print_json(statuses):
    output(json.dumps(statuses))


@click.command(help="Machine status")
@click.option("--id", metavar="<MACHINE-ID>", help="Filter by id")
@click.option("--name", "-n", metavar="<MACHINE-NAME>", help="Filter by name")
@click.option("--tag", "-t", metavar="<TAG-TEXT>", help="Filter by tag")
@click.option("--type", "-m", metavar="<MACHINE-TYPE>", help="Filter by type")
@click.option("--region", "-r", metavar="<REGION>", help="Filter by region")
@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
@click.option("--status-check", metavar="<CHECK>", default="cloud-init-status", help="Status check to perform")
@click.option(
    "--all",
    is_flag=True,
    default=False,
    help="All machines, including those not created by this tool or by other sessions",
)
@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
@click.pass_context
def command(context, id, name, tag, type, region, all, output, quiet, status_check):
    command_context: MainCmdCtx = context.obj

    vms = get_vms(command_context, id, name, tag, type, region, all)

    statuses = []
    for vm in vms:
        status = {"name": vm.name, "id": vm.id, "droplet-status": vm.status, status_check: "UNKNOWN"}
        try:
            r = requests.get(f"http://{vm.ip_address}:4242/cgi-bin/{status_check}")
            if 200 == r.status_code:
                status[status_check] = r.json()["status"]
        except:  # noqa: E722
            pass
        statuses.append(status)

    if output == "json":
        print_json(statuses)
    else:
        print_normal(statuses)
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/status.py has been updated successfully.
📝 Write ssh_keys.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/ssh_keys.py
import click
from machine.log import output
from machine.types import MainCmdCtx


@click.command(help="List ssh keys")
@click.pass_context
def command(context):
    command_context: MainCmdCtx = context.obj
    provider = command_context.provider
    keys = provider.list_ssh_keys()
    for key in keys:
        output(f"{key.id}: {key.name} ({key.fingerprint})")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/ssh_keys.py has been updated successfully.
📝 Write domains.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/domains.py
import click
from machine.log import output
from machine.types import MainCmdCtx


@click.command(help="List dns domains")
@click.pass_context
def command(context):
    command_context: MainCmdCtx = context.obj
    provider = command_context.provider
    domains = provider.list_domains()
    for domain in domains:
        output(f"{domain}")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/domains.py has been updated successfully.
📝 Write list_domain.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/list_domain.py
import click
import json

from machine.log import fatal_error, output
from machine.types import MainCmdCtx, TAG_MACHINE_SESSION_PREFIX
from machine.util import dns_record_to_json_obj


def print_normal(records, zone):
    for record in records:
        output(f"{record.name}\t{record.type}\t{record.data}")


def print_quiet(records):
    for record in records:
        output(record.name)


def print_json(records, vms, zone):
    simplified = []
    for r in records:
        vm = next((v for v in vms if r.data == v.ip_address), None)
        simplified.append(dns_record_to_json_obj(r, zone, vm))
    output(json.dumps(simplified))


@click.command(help="List domain records")
@click.option("--name", "-n", metavar="<RECORD-NAME>", help="Filter by name")
@click.option("--type", "-m", metavar="<RECORD-TYPE>", help="Filter by type (default A and AAAA)")
@click.option("--output", "-o", metavar="<FORMAT>", help="Output format")
@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display machine IDs")
@click.option(
    "--all",
    is_flag=True,
    default=False,
    help="Include all records, even those not created by this tool or created by other sessions",
)
@click.argument("zone", required=False)
@click.pass_context
def command(context, name, type, output, quiet, all, zone):
    command_context: MainCmdCtx = context.obj
    provider = command_context.provider

    if not zone:
        zone = command_context.config.dns_zone
    if not zone:
        fatal_error("Error: no DNS zone specified.")

    records = provider.get_dns_records(zone)

    if type:
        if type != "*":
            records = filter(lambda r: r.type == type, records)
    else:
        records = filter(lambda r: r.type in ["A", "AAAA"], records)

    if all:
        vms = provider.list_vms()
    else:
        vms = provider.list_vms(tag=TAG_MACHINE_SESSION_PREFIX + command_context.session_id)
        vm_ips = [v.ip_address for v in vms]
        records = filter(lambda r: r.data in vm_ips, records)

    records = list(records)
    if output == "json":
        print_json(records, vms, zone)
    elif quiet:
        print_quiet(records)
    else:
        print_normal(records, zone)
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/list_domain.py has been updated successfully.
📝 Write projects.py
/home/david/projects/stirlingbridge/machine/machine/subcommands/projects.py
import click
from machine.log import output
from machine.types import MainCmdCtx


@click.command(help="List projects")
@click.pass_context
def command(context):
    command_context: MainCmdCtx = context.obj
    provider = command_context.provider
    projects = provider.list_projects()
    for project in projects:
        output(f"{project}")
The file /home/david/projects/stirlingbridge/machine/machine/subcommands/projects.py has been updated successfully.

Now let me run the linter and tests:

$ Bash
Run ruff linter
uv run ruff check machine/
   Building machine @ file:///home/david/projects/stirlingbridge/machine
      Built machine @ file:///home/david/projects/stirlingbridge/machine
Uninstalled 1 package in 2ms
Installed 1 package in 1ms
All checks passed!
$ Bash
Run tests
uv run pytest tests/ -v --timeout=30 2>&1
Exit code 4
ERROR: usage: pytest [options] [file_or_dir] [file_or_dir] [...]
pytest: error: unrecognized arguments: --timeout=30
  inifile: /home/david/projects/stirlingbridge/machine/pyproject.toml
  rootdir: /home/david/projects/stirlingbridge/machine
$ Bash
Run tests
uv run pytest tests/ -v 2>&1
Exit code 1
============================= test session starts ==============================
platform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/david/projects/stirlingbridge/machine/.venv/bin/python
cachedir: .pytest_cache
rootdir: /home/david/projects/stirlingbridge/machine
configfile: pyproject.toml
collecting ... collected 35 items / 6 deselected / 29 selected

tests/test_cli_integration.py::TestVersionCommand::test_version_runs PASSED [  3%]
tests/test_cli_integration.py::TestVersionCommand::test_help_runs PASSED [  6%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_expanded_in_config PASSED [ 10%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_multiple_env_vars_expanded PASSED [ 13%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_default_when_unset PASSED [ 17%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_value_when_set PASSED [ 20%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_missing_env_var_without_default_exits_with_error PASSED [ 24%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_in_machine_config_section PASSED [ 27%]
tests/test_config.py::TestExpandEnvVars::test_plain_string_unchanged PASSED [ 31%]
tests/test_config.py::TestExpandEnvVars::test_non_string_types_unchanged PASSED [ 34%]
tests/test_config.py::TestExpandEnvVars::test_simple_variable_substitution PASSED [ 37%]
tests/test_config.py::TestExpandEnvVars::test_variable_embedded_in_string PASSED [ 41%]
tests/test_config.py::TestExpandEnvVars::test_multiple_variables_in_string PASSED [ 44%]
tests/test_config.py::TestExpandEnvVars::test_default_value_when_var_unset PASSED [ 48%]
tests/test_config.py::TestExpandEnvVars::test_default_value_ignored_when_var_set PASSED [ 51%]
tests/test_config.py::TestExpandEnvVars::test_default_value_empty_string PASSED [ 55%]
tests/test_config.py::TestExpandEnvVars::test_unset_variable_without_default_exits PASSED [ 58%]
tests/test_config.py::TestExpandEnvVars::test_dict_values_expanded PASSED [ 62%]
tests/test_config.py::TestExpandEnvVars::test_nested_dict_expanded PASSED [ 65%]
tests/test_config.py::TestExpandEnvVars::test_list_values_expanded PASSED [ 68%]
tests/test_config.py::TestExpandEnvVars::test_mixed_nested_structure PASSED [ 72%]
tests/test_input_validation.py::TestInvalidMachineType::test_invalid_machine_type_error_message PASSED [ 75%]
tests/test_input_validation.py::TestInvalidMachineType::test_valid_machine_type_accepted PASSED [ 79%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_digital_ocean_section PASSED [ 82%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_access_token FAILED [ 86%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_ssh_key PASSED [ 89%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_machines_section PASSED [ 93%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_new_user_name_in_machine PASSED [ 96%]
tests/test_input_validation.py::TestCreateNoInitialize::test_no_initialize_without_type_no_crash PASSED [100%]

=================================== FAILURES ===================================
_____________ TestMissingConfigSections.test_missing_access_token ______________

self = <tests.test_input_validation.TestMissingConfigSections object at 0x7e94bc9dd430>
config_dir = PosixPath('/tmp/pytest-of-david/pytest-11/test_missing_access_token0')

    def test_missing_access_token(self, config_dir):
        config_file = config_dir / "config.yml"
        write_config(
            config_file,
            """\
            digital-ocean:
              ssh-key: test-key
              machine-size: s-1vcpu-1gb
              image: ubuntu-22-04-x64
              region: nyc1
              project: test-project
            machines:
              web-server:
                new-user-name: admin
            """,
        )
        result = run_machine("--config-file", str(config_file), "types")
        assert result.returncode != 0
        assert "access-token" in result.stderr
>       assert "Traceback" not in result.stderr
E       assert 'Traceback' not in "Traceback (...ess-token'\n"
E         
E         'Traceback' is contained here:
E           Traceback (most recent call last):
E         ? +++++++++
E             File "/home/david/projects/stirlingbridge/machine/.venv/bin/machine", line 10, in <module>
E               sys.exit(main())
E                        ^^^^^^...
E         
E         ...Full output truncated (27 lines hidden), use '-vv' to show

tests/test_input_validation.py:135: AssertionError
=========================== short test summary info ============================
FAILED tests/test_input_validation.py::TestMissingConfigSections::test_missing_access_token
================== 1 failed, 28 passed, 6 deselected in 4.45s ==================
Thinking

The failure is because access-token validation now happens in the provider constructor (which crashes with a KeyError). I need to add graceful validation there.

✏️ Edit digitalocean.py
/home/david/projects/stirlingbridge/machine/machine/providers/digitalocean.py
class DigitalOceanProvider(CloudProvider):
    def __init__(self, provider_config):
        self.token = provider_config["access-token"]
        self._manager = digitalocean.Manager(token=self.token)
+
class DigitalOceanProvider(CloudProvider):
    def __init__(self, provider_config):
        if "access-token" not in provider_config:
            fatal_error("Required key 'access-token' not found in 'digital-ocean' section of config file")
        self.token = provider_config["access-token"]
        self._manager = digitalocean.Manager(token=self.token)
The file /home/david/projects/stirlingbridge/machine/machine/providers/digitalocean.py has been updated successfully.
$ Bash
Run tests
uv run pytest tests/ -v 2>&1
============================= test session starts ==============================
platform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/david/projects/stirlingbridge/machine/.venv/bin/python
cachedir: .pytest_cache
rootdir: /home/david/projects/stirlingbridge/machine
configfile: pyproject.toml
collecting ... collected 35 items / 6 deselected / 29 selected

tests/test_cli_integration.py::TestVersionCommand::test_version_runs PASSED [  3%]
tests/test_cli_integration.py::TestVersionCommand::test_help_runs PASSED [  6%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_expanded_in_config PASSED [ 10%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_multiple_env_vars_expanded PASSED [ 13%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_default_when_unset PASSED [ 17%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_with_default_uses_value_when_set PASSED [ 20%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_missing_env_var_without_default_exits_with_error PASSED [ 24%]
tests/test_cli_integration.py::TestEnvVarExpansionIntegration::test_env_var_in_machine_config_section PASSED [ 27%]
tests/test_config.py::TestExpandEnvVars::test_plain_string_unchanged PASSED [ 31%]
tests/test_config.py::TestExpandEnvVars::test_non_string_types_unchanged PASSED [ 34%]
tests/test_config.py::TestExpandEnvVars::test_simple_variable_substitution PASSED [ 37%]
tests/test_config.py::TestExpandEnvVars::test_variable_embedded_in_string PASSED [ 41%]
tests/test_config.py::TestExpandEnvVars::test_multiple_variables_in_string PASSED [ 44%]
tests/test_config.py::TestExpandEnvVars::test_default_value_when_var_unset PASSED [ 48%]
tests/test_config.py::TestExpandEnvVars::test_default_value_ignored_when_var_set PASSED [ 51%]
tests/test_config.py::TestExpandEnvVars::test_default_value_empty_string PASSED [ 55%]
tests/test_config.py::TestExpandEnvVars::test_unset_variable_without_default_exits PASSED [ 58%]
tests/test_config.py::TestExpandEnvVars::test_dict_values_expanded PASSED [ 62%]
tests/test_config.py::TestExpandEnvVars::test_nested_dict_expanded PASSED [ 65%]
tests/test_config.py::TestExpandEnvVars::test_list_values_expanded PASSED [ 68%]
tests/test_config.py::TestExpandEnvVars::test_mixed_nested_structure PASSED [ 72%]
tests/test_input_validation.py::TestInvalidMachineType::test_invalid_machine_type_error_message PASSED [ 75%]
tests/test_input_validation.py::TestInvalidMachineType::test_valid_machine_type_accepted PASSED [ 79%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_digital_ocean_section PASSED [ 82%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_access_token PASSED [ 86%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_ssh_key PASSED [ 89%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_machines_section PASSED [ 93%]
tests/test_input_validation.py::TestMissingConfigSections::test_missing_new_user_name_in_machine PASSED [ 96%]
tests/test_input_validation.py::TestCreateNoInitialize::test_no_initialize_without_type_no_crash PASSED [100%]

======================= 29 passed, 6 deselected in 3.85s =======================

All 29 tests pass, lint is clean. Here's a summary of what was done:

Changes Made

New files

  • machine/provider.py — Abstract base class CloudProvider with provider-neutral data types (VM, SSHKey, DNSRecord). Defines the interface all providers must implement.
  • machine/providers/__init__.py — Provider factory (create_provider()) that instantiates the right provider based on config. Maintains KNOWN_PROVIDERS list.
  • machine/providers/digitalocean.pyDigitalOceanProvider implementation wrapping all python-digitalocean API calls. All DO-specific logic (region/image validation, project assignment, resource URNs) lives here.

Modified files

  • machine/types.pyConfig now has provider_name and provider_config instead of access_token. MainCmdCtx now carries a provider instance.
  • machine/config.py — Auto-detects provider from config file sections (looks for known provider names like digital-ocean). No longer hardcoded to DO.
  • machine/main.py — Creates the provider via factory and passes it through the Click context.
  • machine/util.py — Removed all digitalocean imports. Helper functions now work with provider-neutral VM and DNSRecord types.
  • machine/cloud_config.py — Takes CloudProvider instead of DO Manager.
  • All 8 subcommands — Refactored to use provider from context instead of directly calling digitalocean.*.

Key design decisions

  • Backward compatible — Existing digital-ocean: configs work unchanged with auto-detection.
  • Provider validates itself — Each provider checks its own required config keys (e.g., DO validates access-token).
  • Optional capabilitieslist_projects() and assign_to_project() have default no-op implementations in the ABC, since not all providers support projects.
  • validate_region()/validate_image() — Default no-ops in ABC; DO provider overrides with its known-valid lists.

To add Vultr (or any other provider), you'd now just need to: 1. Create machine/providers/vultr.py implementing CloudProvider 2. Add "vultr" to KNOWN_PROVIDERS in machine/providers/__init__.py 3. Add the elif branch in create_provider()

Excellent thanks. Could you please create a PR for this work. I've already made the branch and pushed it to remote. Add the notes you made above to the PR description please.

$ Bash
Check current branch and recent commits
git branch --show-current && git log --oneline -5 && git remote -v
dboreham/provider-abstraction
b53dbb4 Abstract provider interface to allow future multi-provider support
ba07efa Merge pull request #44 from stirlingbridge/dboreham/migrate-lint-ruff
6bd58c9 Migrate to modern linting tools
2672c00 Merge pull request #42 from stirlingbridge/dboreham/speed-up-e2e-tests
ff437f5 Refector e2e tests to only create one vm
origin	git@github.com:stirlingbridge/machine.git (fetch)
origin	git@github.com:stirlingbridge/machine.git (push)
$ Bash
Create PR
gh pr create --title "Add provider abstraction layer for multi-cloud support" --body "$(cat <<'EOF'
## Summary

Introduces a `CloudProvider` abstraction layer, decoupling all subcommands from DigitalOcean's API. This is the foundation for adding Vultr (and other providers) per #43.

- New `CloudProvider` ABC with provider-neutral data types (`VM`, `SSHKey`, `DNSRecord`)
- `DigitalOceanProvider` implementation wrapping all `python-digitalocean` API calls
- Provider auto-detection from config file sections (existing `digital-ocean:` configs work unchanged)
- All 8 subcommands refactored to use the provider interface instead of calling `digitalocean.*` directly

### New files
- **`machine/provider.py`** — Abstract base class defining the interface all providers must implement
- **`machine/providers/__init__.py`** — Provider factory and `KNOWN_PROVIDERS` registry
- **`machine/providers/digitalocean.py`** — `DigitalOceanProvider` with all DO-specific logic (region/image validation, project assignment, resource URNs)

### Modified files
- **`machine/types.py`** — `Config` now has `provider_name` and `provider_config` instead of `access_token`; `MainCmdCtx` carries a `provider` instance
- **`machine/config.py`** — Auto-detects provider from config sections; no longer hardcoded to DO
- **`machine/main.py`** — Creates provider via factory and passes it through Click context
- **`machine/util.py`** — Removed all `digitalocean` imports; helpers work with neutral types
- **`machine/cloud_config.py`** — Takes `CloudProvider` instead of DO `Manager`
- **All 8 subcommands** — Use `provider` from context instead of `digitalocean.*`

### Design decisions
- **Backward compatible** — Existing `digital-ocean:` configs work unchanged with auto-detection
- **Provider validates itself** — Each provider checks its own required config keys (e.g., DO validates `access-token`)
- **Optional capabilities** — `list_projects()` and `assign_to_project()` have default no-op implementations in the ABC, since not all providers support projects
- **`validate_region()`/`validate_image()`** — Default no-ops in ABC; DO provider overrides with its known-valid lists

### Adding a new provider

To add Vultr (or any other provider), you'd now just need to:
1. Create `machine/providers/vultr.py` implementing `CloudProvider`
2. Add `"vultr"` to `KNOWN_PROVIDERS` in `machine/providers/__init__.py`
3. Add the `elif` branch in `create_provider()`

Closes #43

## Test plan

- [x] All 29 existing unit and integration tests pass
- [x] Linter clean (`ruff check`)
- [ ] E2E tests against real DigitalOcean (requires credentials)

🤖 Generated with [Claude Code](https://claude.com/claude-code)
EOF
)"
https://github.com/stirlingbridge/machine/pull/45

PR created: https://github.com/stirlingbridge/machine/pull/45

Thanks. There was a CI failure on the e2e tests. I will re-start this session with the credentials required to run that test suite.

Sounds good. I'll be ready to debug the e2e test failure when you're back with the credentials.

Caveat: The messages below were generated by the user while running local commands. DO NOT respond to these messages or otherwise consider them in your response unless the user explicitly asks you to.