Source code for deepquantum.optimizer

"""Optimizer: various on-chip optimization algorthims"""

import copy

import numpy as np
import torch
from bayes_opt import BayesianOptimization, UtilityFunction
from torch import nn


[docs] class Optimizer: """A base class for optimizers. Args: target_func: The target function to optimize, more specifically, to minimize. It is supposed to accept ``kwargs`` in the format of ``param_init`` as inputs. param_init: The initial parameters for the target function. The keys of it should be consistent with inputs of ``target_func``. random_state: The random seed for this optimization process. """ def __init__(self, target_func, param_init, random_state=0): self.target_func = target_func if isinstance(param_init, (list, np.ndarray)): self.param_dict = {} for idx in range(len(param_init)): self.param_dict[f'x_{idx}'] = param_init[idx] elif isinstance(param_init, (torch.Tensor, nn.Parameter)): param_init_arr = param_init.cpu().clone().detach().numpy() self.param_dict = {} for idx in range(len(param_init)): self.param_dict[f'x_{idx}'] = param_init_arr[idx] elif isinstance(param_init, dict): self.param_dict = copy.deepcopy(param_init) self.random_state = random_state def __str__(self) -> str: return 'Optimizer'
[docs] class OptimizerBayesian(Optimizer): r"""Optimizer based on Bayesian optimization. See https://github.com/bayesian-optimization/BayesianOptimization. Args: target_func: The target function to optimize, more specifically, to minimize. It is supposed to accept ``kwargs`` in the format of ``param_init`` as inputs. param_init: The initial parameters for the target function. The keys of it should be consistent with inputs of ``target_func``. random_state: The random seed for this optimization process. Note: In the scenerio of on-chip optimization, the periods of phase shifters are all from 0 to :math:`2\pi`, so in this program the ``pbound`` (a parameter determining the search region in Bayesian-Optimization package) is fixed from 0 to :math:`2\pi`. """ def __init__(self, target_func, param_init, random_state=0): super().__init__(target_func, param_init, random_state) def func_to_maximize(**param_dict: dict) -> float: return -self.target_func(**param_dict) self.pbounds = self.gen_pbounds() # BO 内置用法是最大化目标 self.optimizer = BayesianOptimization(f=func_to_maximize, pbounds=self.pbounds, random_state=self.random_state) self.util = UtilityFunction(kind='ucb', kappa=2.576, xi=0.0, kappa_decay=1, kappa_decay_delay=0) self.best_param_dict = copy.deepcopy(self.param_dict) self.best_target = -np.inf self.iter = 0
[docs] def gen_pbounds(self) -> dict: pbounds = {} for key in self.param_dict: pbounds[key] = (0, np.pi * 2) return pbounds
[docs] def param_suggest(self) -> np.ndarray: self.util.update_params() x_probe = self.optimizer.suggest(self.util) x = self.optimizer._space._as_array(x_probe) # a list param_array = np.asarray(x).reshape(-1) return param_array
[docs] def param_register(self, param_array: np.ndarray, target: float) -> None: for i in range(len(param_array)): x = param_array[i] param_dict = dict(zip(self.param_dict.keys(), x, strict=True)) if self.optimizer._space._constraint is None: self.optimizer._space.register(x, target[i]) else: constraint_value = self.optimizer._space._constraint.eval(**param_dict) self.optimizer._space.register(x, target[i], constraint_value) if target[i] > self.best_target: self.best_param_dict = copy.deepcopy(param_dict) self.best_target = target[i] self.iter += 1
[docs] def run(self, nstep: int, if_print: bool = False) -> list: for step in range(nstep): p1 = self.param_suggest() f1 = -self.target_func(p1) if isinstance(f1, torch.Tensor): f1 = f1.item() if if_print: print(step, '|', -f1) self.param_register([p1], [f1]) return list(self.best_param_dict.values())
[docs] class OptimizerSPSA(Optimizer): """Optimizer based on SPSA (Simultaneous Perturbation Stochastic Approximation). See https://www.jhuapl.edu/spsa/Pages/MATLAB.htm. Args: target_func: The target function to optimize, more specifically, to minimize. It is supposed to accept ``kwargs`` in the format of ``param_init`` as inputs. param_init: The initial parameters for the target function. The keys of it should be consistent with inputs of ``target_func``. random_state: The random seed for this optimization process. """ def __init__(self, target_func, param_init, random_state=0): super().__init__(target_func, param_init, random_state) self.random_state_ori = np.random.get_state() np.random.seed(self.random_state) self.hyperparam = {'a': 1e-1, 'c': 1e-2, 'A': 200, 'nepoch': 2000, 'alpha': 0.602, 'gamma': 0.101} self.iter = 0 self.nparam = len(param_init) self.best_param_dict = copy.deepcopy(self.param_dict) self.best_target = np.inf
[docs] def set_hyperparam(self, hyperparam: dict) -> None: """Set hyperparameters. The keys include ``'a'``, ``'c'``, ``'A'``, ``'nepoch'``, ``'alpha'``, ``'gamma'``. """ self.hyperparam = hyperparam
[docs] def param_suggest(self) -> np.ndarray: tmp_param = np.asarray(list(self.param_dict.values())) delta_lr = self.hyperparam['c'] / (1 + self.iter) ** self.hyperparam['gamma'] delta = (np.random.randint(0, 2, self.nparam) * 2 - 1) * delta_lr param_array = np.zeros((2, self.nparam)) param_array[0] = tmp_param - delta param_array[1] = tmp_param + delta return param_array
[docs] def param_register(self, param_array: np.ndarray | list, target: np.ndarray | list) -> None: assert len(param_array) == 2 assert len(target) == 2 param_lr = self.hyperparam['a'] / (1 + self.iter + self.hyperparam['A']) ** self.hyperparam['alpha'] param1 = param_array[0] param2 = param_array[1] target1 = target[0] target2 = target[1] delta = param2 - param1 grad = (target2 - target1) / delta param_new = 0.5 * (param1 + param2) - param_lr * grad self.param_dict = dict(zip(self.param_dict.keys(), param_new, strict=True)) self.iter += 1 if target1 < self.best_target: self.best_param_dict = dict(zip(self.param_dict.keys(), param1, strict=True)) self.best_target = target1 if target2 < self.best_target: self.best_param_dict = dict(zip(self.param_dict.keys(), param2, strict=True)) self.best_target = target2
[docs] def ori_random_state(self) -> None: np.random.set_state(self.random_state_ori)
[docs] def run(self, nstep: int, if_print: bool = False) -> list: for step in range(nstep): p1, p2 = self.param_suggest() f1 = self.target_func(p1) f2 = self.target_func(p2) if isinstance(f1, torch.Tensor): f1 = f1.item() f2 = f2.item() self.param_register([p1, p2], [f1, f2]) if if_print: print(step, '|', f1, f2) return list(self.best_param_dict.values())
[docs] class OptimizerFourier(Optimizer): """Optimizer based on Fourier series. Obtain the gradient approximation from the target function approximation. Args: target_func: The target function to optimize, more specifically, to minimize. It is supposed to accept ``kwargs`` in the format of ``param_init`` as inputs. param_init: The initial parameters for the target function. The keys of it should be consistent with inputs of ``target_func``. order: The order of Fourier series to approximate. lr: The learning rate of the learning process (namely, gradient descent process). random_state: The random seed for this optimization process. """ def __init__(self, target_func, param_init, order=5, lr=0.1, random_state=0): super().__init__(target_func, param_init, random_state) self.iter = 0 self.r = order self.nparam = len(param_init) self.best_param_dict = copy.deepcopy(self.param_dict) self.best_target = np.inf self.lr = lr self.a = self.gen_a() self.u = np.zeros((2 * order + 1) * self.nparam) self.iter = 0
[docs] def gen_a(self) -> np.ndarray: a = np.zeros((2 * self.r + 1, 2 * self.r + 1)) mu = np.arange(2 * self.r + 1) x_mu = 2 * np.pi * (mu - self.r) / (2 * self.r + 1) a[:, 0] = 1 a[:, 1 : self.r + 1] = np.cos(x_mu.reshape(-1, 1) @ np.arange(1, self.r + 1).reshape(1, -1)) a[:, self.r + 1 : 2 * self.r + 2] = np.sin(x_mu.reshape(-1, 1) @ np.arange(1, self.r + 1).reshape(1, -1)) return a
[docs] def param_suggest(self) -> np.ndarray: tmp_param = np.asarray(list(self.param_dict.values()), dtype=float).reshape(1, -1) mu = np.arange(2 * self.r + 1) varied_param = 2 * np.pi * (mu - self.r) / (2 * self.r + 1) param_array = np.repeat(tmp_param, self.nparam * (2 * self.r + 1), axis=0) for param_id in range(self.nparam): param_array[param_id * (2 * self.r + 1) : (param_id + 1) * (2 * self.r + 1), param_id] = varied_param return param_array
[docs] def param_register(self, param_array: np.ndarray, target: np.ndarray): assert len(param_array) == (2 * self.r + 1) * self.nparam assert len(target) == (2 * self.r + 1) * self.nparam # 求解线性方程组得出组合系数 param = np.asarray(list(self.param_dict.values())) for param_id in range(self.nparam): idx1 = param_id * (2 * self.r + 1) idx2 = (1 + param_id) * (2 * self.r + 1) self.u[idx1:idx2] = np.linalg.solve(self.a, target[idx1:idx2]) # 根据组合系数计算当前位置处的偏导数 grad = np.zeros(self.nparam) for param_id in range(self.nparam): theta = param[param_id] idx = 1 + param_id * (2 * self.r + 1) grad[param_id] = ( -(np.arange(1, self.r + 1) * np.sin(theta * np.arange(1, self.r + 1))) @ self.u[idx : self.r + idx] + (np.arange(1, self.r + 1) * np.cos(theta * np.arange(1, self.r + 1)))
[docs] @ self.u[self.r + idx : self.r * 2 + idx] ) param_new = param - self.lr * grad self.param_dict = dict(zip(self.param_dict.keys(), param_new, strict=True)) if target.min() < self.best_target: self.best_target = target.min() self.best_param_dict = dict(zip(self.param_dict.keys(), param_array[target.argmin()], strict=True)) self.iter += 1
def run(self, nstep: int, if_print: bool = False) -> list: for step in range(nstep): param_array = self.param_suggest() target = np.zeros(len(param_array)) for i in range(len(param_array)): tmp = self.target_func(param_array[i]) if isinstance(tmp, torch.Tensor): tmp = tmp.item() target[i] = tmp self.param_register(param_array, target) if if_print: print(step, '|', target.min()) return list(self.best_param_dict.values())