Source code for atlas_q.vqe_qaoa

"""
Variational Quantum Eigensolver (VQE) and QAOA

Implements variational algorithms for optimization and ground state finding:
- VQE for chemistry and physics
- QAOA for combinatorial optimization
- Hardware-efficient ansätze
- Parameter optimization

Author: ATLAS-Q Contributors
Date: October 2025
License: MIT
"""

from __future__ import annotations

import json
import os
import time
import warnings
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import numpy as np
import torch

try:
    from scipy.optimize import minimize
    SCIPY_AVAILABLE = True
except ImportError:
    SCIPY_AVAILABLE = False
    warnings.warn("SciPy not available. VQE/QAOA optimization will be limited.")

# Matplotlib headless for saving plots
import matplotlib

from .adaptive_mps import AdaptiveMPS
from .mpo_ops import MPO, expectation_value

matplotlib.use("Agg")
import matplotlib.pyplot as plt


[docs] @dataclass class VQEConfig: """Configuration for VQE""" ansatz: str = "hardware_efficient" # 'hardware_efficient', 'uccsd', 'custom' n_layers: int = 3 optimizer: str = "L-BFGS-B" # 'COBYLA', 'L-BFGS-B', 'BFGS', 'Adam' (SciPy) max_iter: int = 200 tol: float = 1e-9 chi_max: int = 256 gradient_method: str = "group" # 'group', 'per_pauli', None (defaults to 'group' for best performance) device: str = "cuda" dtype: torch.dtype = torch.complex128 # complex128 for precision
[docs] class HardwareEfficientAnsatz: """ Hardware-efficient ansatz for VQE. Layer = [Ry(θ) on all qubits] + [CZ on neighboring pairs] """
[docs] def __init__(self, n_qubits: int, n_layers: int, device: str = "cuda", dtype: torch.dtype = torch.complex128): self.n_qubits = n_qubits self.n_layers = n_layers self.device = device self.dtype = dtype self.n_params = n_qubits * n_layers
[docs] def apply(self, mps: AdaptiveMPS, params: np.ndarray): assert len(params) == self.n_params param_idx = 0 for _ in range(self.n_layers): # single-qubit RY for q in range(self.n_qubits): theta = params[param_idx]; param_idx += 1 mps.apply_single_qubit_gate(q, self._ry_gate(theta)) # CZ entanglers (even then odd pairs) CZ = torch.diag(torch.tensor([1, 1, 1, -1], dtype=self.dtype, device=self.device)) for q in range(0, self.n_qubits - 1, 2): mps.apply_two_site_gate(q, CZ) for q in range(1, self.n_qubits - 1, 2): mps.apply_two_site_gate(q, CZ)
def _ry_gate(self, theta: float) -> torch.Tensor: c = np.cos(theta / 2.0); s = np.sin(theta / 2.0) return torch.tensor([[c, -s], [s, c]], dtype=self.dtype, device=self.device)
[docs] def get_param_shift(self, k: int) -> float: # Standard shift for Ry parameters return np.pi / 2.0
# For chemistry drivers that expect it
[docs] def prepare_hf_state(self, chi_max: int = 64) -> AdaptiveMPS: return AdaptiveMPS( num_qubits=self.n_qubits, bond_dim=2, chi_max_per_bond=chi_max, device=self.device, dtype=self.dtype )
[docs] class VQE: """ VQE driver with: - Lightweight warm-start (1D/2D) - Optional per-Pauli parameter-shift gradients - Incremental JSONL progress + PNG plots in molecular_results/ - Quiet-by-default logging with heartbeat """
[docs] def __init__(self, hamiltonian: MPO, config: VQEConfig, custom_ansatz=None, output_dir: Optional[str] = None): self.H = hamiltonian self.config = config # Init ansatz if custom_ansatz is not None: self.ansatz = custom_ansatz elif config.ansatz == "hardware_efficient": self.ansatz = HardwareEfficientAnsatz( n_qubits=self.H.n_sites, n_layers=config.n_layers, device=config.device, dtype=config.dtype ) elif config.ansatz == "custom": self.ansatz = None else: raise ValueError(f"Unknown ansatz: {config.ansatz}") # Tracking self.energies: List[float] = [] self.param_history: List[np.ndarray] = [] self.iteration: int = 0 # Behavior flags (tweak from caller) self._fci_ref: Optional[float] = None self._use_warm_start: bool = True self._use_per_pauli_gradients: bool = False # Logging/outputs self.quiet: bool = True # minimal console chatter self.status_every: int = 10 # heartbeat cadence # Warm-start defaults (tuned to be light for >10 params) self.ws_points_1d: int = 21 self.ws_theta_max_1d: float = 1.5 self.ws_points_2d: int = 11 self.ws_theta_max_2d: float = 1.5 # Incremental outputs (only enabled if output_dir explicitly provided) self.output_dir = output_dir if self.output_dir: os.makedirs(self.output_dir, exist_ok=True) self._progress_path: Optional[str] = None self._run_tag: Optional[str] = None
# ---------------- Utilities: minimal logging & progress I/O ---------------- def _p(self, msg: str): if not self.quiet: print(msg, flush=True) def _start_progress_log(self, label: str): if not self.output_dir: return ts = time.strftime("%Y%m%d_%H%M%S") self._run_tag = f"{label}_{ts}" self._progress_path = os.path.join(self.output_dir, f"{self._run_tag}.jsonl") with open(self._progress_path, "a") as f: f.write(json.dumps({"event": "run_start", "label": label, "time": time.time()}) + "\n") def _write_progress(self, payload: Dict): if not self._progress_path: return row = dict(payload); row.setdefault("time", time.time()) with open(self._progress_path, "a") as f: f.write(json.dumps(row) + "\n") # ---------------- Core evals ---------------- def _energy_at_params(self, params: np.ndarray) -> float: # Prepare HF state if available, otherwise |0…0⟩ if hasattr(self.ansatz, "prepare_hf_state"): mps = self.ansatz.prepare_hf_state(chi_max=self.config.chi_max) else: mps = AdaptiveMPS( num_qubits=self.H.n_sites, bond_dim=2, chi_max_per_bond=self.config.chi_max, device=self.config.device, dtype=self.config.dtype, ) # Apply ansatz (UCCSD expects chi_max in apply) try: self.ansatz.apply(mps, params, chi_max=self.config.chi_max) # UCCSD signature except TypeError: self.ansatz.apply(mps, params) # HEA signature E = expectation_value(self.H, mps) return float(E.real) def _cost_function(self, params: np.ndarray) -> float: E = self._energy_at_params(params) self.energies.append(E) self.param_history.append(params.copy()) self.iteration += 1 if (self.iteration % self.status_every == 0) or (self.iteration == 1): self._write_progress({"event": "iter", "iter": self.iteration, "E": E, "best_E": float(min(self.energies))}) self._p(f"[VQE] iter {self.iteration:4d} E = {E:.8f}") return E # --------- Gradients --------- def _gradient_at_params_parameter_shift(self, params: np.ndarray) -> np.ndarray: grad = np.zeros_like(params) for k in range(self.ansatz.n_params): s = self.ansatz.get_param_shift(k) p_plus = params.copy(); p_plus[k] += s p_minus = params.copy(); p_minus[k] -= s grad[k] = 0.5 * (self._energy_at_params(p_plus) - self._energy_at_params(p_minus)) return grad def _gradient_at_params_per_pauli_shift(self, params: np.ndarray) -> np.ndarray: grad = np.zeros_like(params) for k in range(self.ansatz.n_params): if not hasattr(self.ansatz, "param_groups"): # fallback to group-level shift if ansatz doesn't expose groups s = self.ansatz.get_param_shift(k) p_plus = params.copy(); p_plus[k] += s p_minus = params.copy(); p_minus[k] -= s grad[k] = 0.5 * (self._energy_at_params(p_plus) - self._energy_at_params(p_minus)) continue gk = 0.0 for coeff, _P in self.ansatz.param_groups[k]: a = abs(coeff.imag) if abs(coeff.imag) > 1e-14 else abs(coeff.real) if a < 1e-14: # skip zeros continue s = np.pi / (2.0 * a) p_plus = params.copy(); p_plus[k] += s p_minus = params.copy(); p_minus[k] -= s gk += a * (self._energy_at_params(p_plus) - self._energy_at_params(p_minus)) / 2.0 grad[k] = gk return grad # --------- Warm-start (light) --------- def _grid_search_1d(self, k: int, theta_max: float, points: int) -> float: thetas = np.linspace(-theta_max, theta_max, points) base = np.zeros(self.ansatz.n_params) best_E = float("+inf"); best_t = 0.0 self._write_progress({"event": "warm1d_start", "k": k, "points": points}) for i, t in enumerate(thetas, 1): p = base.copy(); p[k] = t E = self._energy_at_params(p) if E < best_E: best_E, best_t = E, t if i % max(1, points // 5) == 0: self._write_progress({"event": "warm1d_step", "k": k, "i": i, "E": E}) self._write_progress({"event": "warm1d_done", "k": k, "theta": best_t, "E": best_E}) return best_t def _grid_search_2d(self, i0: int, i1: int, theta_max: float, points: int) -> np.ndarray: grid = np.linspace(-theta_max, theta_max, points) base = np.zeros(self.ansatz.n_params) best = base.copy(); best_E = float("+inf") self._write_progress({"event": "warm2d_start", "i0": i0, "i1": i1, "points": points}) for t0 in grid: row_best = float("+inf") for t1 in grid: p = base.copy(); p[i0] = t0; p[i1] = t1 E = self._energy_at_params(p) if E < best_E: best_E, best = E, p if E < row_best: row_best = E self._write_progress({"event": "warm2d_row", "i0_val": float(t0), "row_best_E": row_best}) self._write_progress({"event": "warm2d_done", "best_E": best_E}) return best # --------- Plotting --------- def _plot_convergence(self, label: str): if not self.energies or not self.output_dir: return plt.figure(figsize=(8, 5)) xs = list(range(1, len(self.energies) + 1)) plt.plot(xs, self.energies, marker='o', linewidth=1.5) plt.xlabel("Iteration"); plt.ylabel("Energy (Hartree)") title = f"VQE Convergence: {label}" if self._run_tag: title += f" [{self._run_tag}]" plt.title(title); plt.grid(alpha=0.3); plt.tight_layout() path = os.path.join(self.output_dir, f"{self._run_tag}_convergence.png") plt.savefig(path, dpi=200); plt.close() self._write_progress({"event": "plot_saved", "path": path}) # --------- Public: run() ---------
[docs] def run(self, initial_params: Optional[np.ndarray] = None, label: str = "molecule") -> Tuple[float, np.ndarray]: if not SCIPY_AVAILABLE: raise ImportError("SciPy required for VQE optimization") self._start_progress_log(label) t0 = time.time() # Sanity HF zero = np.zeros(self.ansatz.n_params, dtype=float) hf_energy = self._energy_at_params(zero) self._write_progress({"event": "sanity", "hf_energy": hf_energy}) self._p(f"[VQE] <HF|H|HF> = {hf_energy:.8f} Ha") # Initial params if initial_params is None: initial_params = np.zeros(self.ansatz.n_params, dtype=float) # Warm-start (lightweight) if self._use_warm_start: n = self.ansatz.n_params pts1 = self.ws_points_1d if n > 10 else 41 tmax1 = self.ws_theta_max_1d warm = initial_params.copy() self._p(f"[Warm-Start] 1D scans on {min(n, 8)} params ({pts1} pts, ±{tmax1})") for k in range(min(n, 8)): warm[k] = self._grid_search_1d(k, theta_max=tmax1, points=pts1) if n >= 2: pts2 = self.ws_points_2d if n > 10 else 21 tmax2 = self.ws_theta_max_2d seed2d = self._grid_search_2d(0, 1, theta_max=tmax2, points=pts2) if self._energy_at_params(seed2d) < self._energy_at_params(warm): warm = seed2d initial_params = warm self._write_progress({"event": "warm_done", "start_E": self._energy_at_params(initial_params)}) # Optimizer method = self.config.optimizer options = {"maxiter": self.config.max_iter} if method.upper() in {"BFGS", "L-BFGS-B"}: options["ftol"] = self.config.tol options["gtol"] = self.config.tol use_jac = method.upper() in {"BFGS", "L-BFGS-B"} extra = {} if use_jac: if self._use_per_pauli_gradients: extra["jac"] = lambda x: self._gradient_at_params_per_pauli_shift(x) else: extra["jac"] = lambda x: self._gradient_at_params_parameter_shift(x) if method.upper() == "L-BFGS-B": extra["bounds"] = [(-np.pi, np.pi)] * self.ansatz.n_params # Reset traces self.iteration = 0 self.energies = [] self.param_history = [] self._p(f"[VQE] Starting {method} with {self.ansatz.n_params} params…") self._write_progress({"event": "opt_start", "method": method}) result = minimize(self._cost_function, initial_params, method=method, options=options, **extra) elapsed = time.time() - t0 self._write_progress({"event": "opt_done", "final_E": float(result.fun), "iters": self.iteration, "elapsed_s": elapsed}) self._plot_convergence(label) self._p(f"VQE converged: E = {result.fun:.6f} after {self.iteration} iterations ({elapsed:.2f}s)") return float(result.fun), np.array(result.x, dtype=float)
[docs] class QAOAAnsatz: """QAOA ansatz for combinatorial optimization"""
[docs] def __init__(self, cost_hamiltonian: MPO, n_layers: int, device: str = "cuda", dtype: torch.dtype = torch.complex128): self.H_cost = cost_hamiltonian self.n_qubits = cost_hamiltonian.n_sites self.n_layers = n_layers self.device = device self.dtype = dtype X = torch.tensor([[0, 1], [1, 0]], dtype=dtype, device=device) ops = [-X] * self.n_qubits self.H_mixer = MPO.from_local_ops(ops, device=device) self.n_params = 2 * n_layers
[docs] def apply(self, mps: AdaptiveMPS, params: np.ndarray): assert len(params) == self.n_params for layer in range(self.n_layers): gamma = params[2 * layer]; beta = params[2 * layer + 1] self._apply_cost_layer(mps, gamma) self._apply_mixer_layer(mps, beta)
def _apply_cost_layer(self, mps: AdaptiveMPS, gamma: float): for q in range(self.n_qubits): mps.apply_single_qubit_gate(q, self._rz_gate(2 * gamma)) def _apply_mixer_layer(self, mps: AdaptiveMPS, beta: float): for q in range(self.n_qubits): mps.apply_single_qubit_gate(q, self._rx_gate(2 * beta)) def _rz_gate(self, theta: float) -> torch.Tensor: return torch.tensor( [[np.exp(-1j * theta / 2), 0], [0, np.exp(1j * theta / 2)]], dtype=self.dtype, device=self.device, ) def _rx_gate(self, theta: float) -> torch.Tensor: c = np.cos(theta / 2); s = np.sin(theta / 2) return torch.tensor([[c, -1j * s], [-1j * s, c]], dtype=self.dtype, device=self.device)
[docs] class QAOA: """Quantum Approximate Optimization Algorithm"""
[docs] def __init__(self, cost_hamiltonian: MPO, n_layers: int = 3, optimizer: str = "COBYLA", device: str = "cuda", dtype: torch.dtype = torch.complex128): self.H_cost = cost_hamiltonian self.n_layers = n_layers self.optimizer = optimizer self.device = device self.dtype = dtype self.ansatz = QAOAAnsatz(cost_hamiltonian, n_layers, device, dtype) self.energies = []; self.iteration = 0
@property def n_params(self) -> int: return self.ansatz.n_params def _cost_function(self, params: np.ndarray) -> float: mps = AdaptiveMPS(num_qubits=self.H_cost.n_sites, bond_dim=2, chi_max_per_bond=64, device=self.device, dtype=self.dtype) H = torch.tensor([[1, 1], [1, -1]], dtype=self.dtype, device=self.device) / np.sqrt(2) for q in range(mps.num_qubits): mps.apply_single_qubit_gate(q, H) self.ansatz.apply(mps, params) E = expectation_value(self.H_cost, mps) self.energies.append(float(E.real)); self.iteration += 1 if self.iteration % 10 == 0: print(f"QAOA Iter {self.iteration}: Cost = {E.real:.6f}") return float(E.real)
[docs] def run(self, initial_params: Optional[np.ndarray] = None) -> Tuple[float, np.ndarray]: if not SCIPY_AVAILABLE: raise ImportError("SciPy required for QAOA") if initial_params is None: initial_params = np.random.randn(self.ansatz.n_params) * 0.1 result = minimize(self._cost_function, initial_params, method=self.optimizer, options={"maxiter": 200}) print(f"\nQAOA converged: Cost = {result.fun:.6f}") return float(result.fun), np.array(result.x, dtype=float)
def _prepare_initial_state(self) -> AdaptiveMPS: """Prepare initial |+⟩^⊗n state for QAOA""" mps = AdaptiveMPS(num_qubits=self.H_cost.n_sites, bond_dim=2, chi_max_per_bond=64, device=self.device, dtype=self.dtype) # Apply Hadamard to all qubits to create |+⟩ state H = torch.tensor([[1, 1], [1, -1]], dtype=self.dtype, device=self.device) / np.sqrt(2) for q in range(mps.num_qubits): mps.apply_single_qubit_gate(q, H) return mps def _build_mixer_hamiltonian(self) -> MPO: """Build the QAOA mixer Hamiltonian (X mixer)""" return self.ansatz.H_mixer
# Chemistry-specific utilities
[docs] def build_molecular_hamiltonian( h1: np.ndarray, h2: np.ndarray, mapping: str = "jordan_wigner", device: str = "cuda" ) -> MPO: """ Build molecular Hamiltonian MPO from 1- and 2-electron integrals Args: h1: One-electron integrals [n_orb, n_orb] h2: Two-electron integrals [n_orb, n_orb, n_orb, n_orb] mapping: 'jordan_wigner' or 'bravyi_kitaev' device: torch device Returns: MPO representation of electronic Hamiltonian """ # Placeholder - proper implementation requires: # 1. Second quantization operators # 2. Fermion-to-qubit mapping # 3. Pauli string collection # 4. MPO compression n_qubits = h1.shape[0] * 2 # Spin-orbitals # For now, return identity return MPO.identity(n_qubits, device=device)