#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 23 13:20:11 2020
@author: sjoly
@modified by: MaltheRaschke
"""
import numpy as np
from functools import partial
from scipy.optimize import minimize
from .solvers import Solvers
from .objectiveFunctions import ObjectiveFunctions as obj
from .resonatorFormulas import Impedances as imp
from .resonatorFormulas import Wakes as wak
from .utils import compute_fft
[docs]class EvolutionaryAlgorithm:
def __init__(self,
x_data,
y_data,
N_resonators,
parameterBounds,
plane="longitudinal",
fitFunction="impedance",
objectiveFunction = obj.sumOfSquaredError,
wake_length=None,
sigma=None,
):
"""
Implements an evolutionary algorithm for fitting impedance models to data.
This class optimizes a resonator-based impedance model to fit measured or
simulated impedance data using an evolutionary algorithm. It supports both
longitudinal and transverse impedance models with a variable number of resonators.
Parameters
----------
x_data : numpy.ndarray
Stores the input x data: frequencies for impedance [Hz], or
times for wake function and wake potential [s]
y_data : numpy.ndarray
Stores the input y data: impedance data [Ohm] or Wake function/potential [V/C/s]
N_resonators : int
Number of resonators in the model.
parameterBounds : list of tuple
List of parameter bounds for the optimization. Each resonator has three
parameters: Rs (shunt resistance), Q (quality factor), and fr (resonant frequency).
fitFunction: str, optional
Specify which fitFunction to use: ["impedance", "wake", "wake_potential"]
plane : str, optional
Type of impedance model, either `"longitudinal"` or `"transverse"`.
Default is `"longitudinal"`.
objectiveFunction : callable, optional
The objective function to minimize. Default is `obj.sumOfSquaredError`.
wake_length : float, optional
Length of the wake function in meters. Required for wake computations.
sigma : float, optional
Attributes
----------
fitFunction : callable
Partial function used to compute impedance based on the chosen model
(`imp.Resonator_longitudinal_imp`, `imp.n_Resonator_longitudinal_imp`, etc.).
evolutionParameters : dict or None
Stores parameters of the evolutionary optimization algorithm.
minimizationParameters : numpy.ndarray or None
Stores the best-fit parameters obtained from the optimization.
time_data : numpy.ndarray
Stores the input x data: times for wake function and wake potential [s]
wake_data : numpy.ndarray
Stores the input Wake function [V/C/s]
wake_potential_data : numpy.ndarray
Stores the input Wake potential [V/C/s]
frequency_data : numpy.ndarray
Stores the input x data: frequencies for impedance [Hz]
impedance_data : numpy.ndarray
Stores the input impedance data [Ohm]
Notes
-----
- The `fitFunction` is assigned based on the `plane` type, the `fitFunction` mode, and the number of resonators.
- The impedance and wake model is based on resonators and can be used for both
single-resonator and multi-resonator systems.
- The optimization is performed using an evolutionary algorithm, with results
stored in `minimizationParameters`.
Examples
--------
>>> import numpy as np
>>> from evolutionary_algorithm import EvolutionaryAlgorithm
>>> freq = np.linspace(1e9, 5e9, 100) # Frequency range from 1 GHz to 5 GHz
>>> Z = np.random.rand(100) # Example impedance data
>>> bounds = [(10, 1000), (1, 100), (1e9, 5e9)] # Example bounds for Rs, Q, fr
>>> algo = EvolutionaryAlgorithm(x_data=freq, y_data=Z,
... N_resonators=1, parameterBounds=bounds)
>>> print(algo.plane)
'longitudinal'
"""
self.x_data = x_data
self.y_data = y_data
self.N_resonators = N_resonators
self.parameterBounds = parameterBounds
self.objectiveFunction = objectiveFunction
self.wake_length = wake_length
self.plane = plane
self.sigma = sigma
self.time_data = None
self.wake_data = None
self.wake_potential_data = None
self.frequency_data = None
self.impedance_data = None
self.evolutionParameters = None
self.minimizationParameters = None
if fitFunction == "wake" or fitFunction == "wake function":
if plane == "longitudinal" and N_resonators > 1:
self.fitFunction = wak.n_Resonator_longitudinal_wake
elif plane == "transverse" and N_resonators > 1:
self.fitFunction = wak.n_Resonator_transverse_wake
elif plane == "longitudinal" and N_resonators == 1:
self.fitFunction = wak.Resonator_longitudinal_wake
elif plane == "transverse" and N_resonators == 1:
self.fitFunction = wak.Resonator_transverse_wake
else:
raise Exception('Algorithm needs N_resonartors >= 1')
self.time_data = x_data
self.wake_data = y_data
elif fitFunction == "wake potential":
if self.sigma is None:
print('[!] sigma not specified, using the default sigma=1e-10 s')
self.sigma = 1e-10
if plane == "longitudinal" and N_resonators > 1:
self.fitFunction = partial(wak.n_Resonator_longitudinal_wake_potential, sigma=self.sigma)
elif plane == "transverse" and N_resonators > 1:
self.fitFunction = partial(wak.n_Resonator_transverse_wake_potential, sigma=self.sigma)
elif plane == "longitudinal" and N_resonators == 1:
self.fitFunction = partial(wak.Resonator_longitudinal_wake_potential, sigma=self.sigma)
elif plane == "transverse" and N_resonators == 1:
self.fitFunction = partial(wak.Resonator_transverse_wake_potential, sigma=self.sigma)
else:
raise Exception('Algorithm needs N_resonartors >= 1')
self.time_data = x_data
self.wake_potential_data = y_data
else: #Default to "impedance"
if wake_length is not None:
print('[!] Using the partially decayed resonator formalism for impedance')
else:
print('[!] Using the fully decayed resonator formalism for impedance')
if plane == "longitudinal" and N_resonators > 1:
self.fitFunction = partial(imp.n_Resonator_longitudinal_imp, wake_length=wake_length)
elif plane == "transverse" and N_resonators > 1:
self.fitFunction = partial(imp.n_Resonator_transverse_imp, wake_length=wake_length)
elif plane == "longitudinal" and N_resonators == 1:
self.fitFunction = partial(imp.Resonator_longitudinal_imp, wake_length=wake_length)
elif plane == "transverse" and N_resonators == 1:
self.fitFunction = partial(imp.Resonator_transverse_imp, wake_length=wake_length)
else:
raise Exception('Algorithm needs N_resonartors >= 1')
self.frequency_data = x_data
self.impedance_data = y_data
[docs] def check_y_data(self):
"""
Small function to avoid 0 frequency leading to zero division when using resonators.
"""
mask = np.where(self.x_data > 0.)[0]
self.x_data = self.x_data[mask]
self.y_data = self.y_data[mask]
[docs] def generate_Initial_Parameters(self, parameterBounds, objectiveFunction, fitFunction,
x_values_data, y_values_data,
maxiter=2000, popsize=150,
mutation=(0.1, 0.5), crossover_rate=0.8,
tol=0.01,
solver='scipy',
):
"""
Generates initial parameter guesses for a minimization algorithm.
This function uses a differential evolution (DE) solver to find approximate
solutions to a minimization problem, which can be used as initial guesses for
more precise optimizers.
Args:
parameterBounds: A list of tuples representing the upper and lower bounds
for each parameter.
objective_function: A function that calculates the cost given a set of
parameters and data. The signature should be
`objective_function(parameters, fit_function, x_data, y_data)`.
fit_function: A function that calculates the fit between a model and data.
The signature should be `fit_function(parameters, x_data, y_data)`.
x_values_data: The x-values of the data to fit.
y_values_data: The y-values of the data to fit.
maxiter: The maximum number of iterations for the DE solver.
popsize: The population size for the DE algorithm.
mutation: A tuple of two floats representing the mutation factors.
crossover_rate: The crossover rate for the DE algorithm.
tol: The tolerance for convergence.
solver: The solver to use for differential evolution. Valid options are
"scipy", "pyfde", or "pyfde_jade". Defaults to "scipy".
Returns:
A tuple containing:
- The estimated initial parameters found by the DE solver.
- A message indicating the solver's status.
"""
objective_function = partial(objectiveFunction,
fitFunction=fitFunction,
x=x_values_data,
y=y_values_data
)
# Map solver names to functions
solver_functions = {
"scipy": Solvers.run_scipy_solver,
"pyfde": Solvers.run_pyfde_solver,
"pyfde_jade": Solvers.run_pyfde_jade_solver,
}
solver_function = solver_functions.get(solver)
if solver == "pyfde_jade":
mutation, crossover_rate = None, None
if not solver_function:
raise ValueError(f"Invalid solver name: {solver}")
solution, message = solver_function(parameterBounds,
objective_function,
maxiter=maxiter,
popsize=popsize,
mutation=mutation,
crossover_rate=crossover_rate,
tol=tol)
return solution, message
[docs] def run_differential_evolution(self,
maxiter=2000,
popsize=15,
mutation=(0.1, 0.5),
crossover_rate=0.8,
tol=0.01,
#workers=-1,
vectorized=False,
solver='scipy',
iteration_convergence=False, debug=False):
evolutionParameters, warning = self.generate_Initial_Parameters(self.parameterBounds,
self.objectiveFunction,
self.fitFunction,
self.x_data,
self.y_data,
maxiter=maxiter,
popsize=popsize,
mutation=mutation,
crossover_rate=crossover_rate,
tol=tol,
solver=solver
#workers=workers, vectorized=vectorized,
#iteration_convergence=iteration_convergence
)
self.evolutionParameters = evolutionParameters
self.warning = warning
self.display_resonator_parameters(self.evolutionParameters)
[docs] def run_minimization_algorithm(self, margin=0.1, method='Nelder-Mead'):
"""
Minimization algorithm is used to refine results obtained by the DE algorithm.
They are used as initial guess for the algorithm and each parameter is allowed to be
increased or decreased by 100*margin [%].
"""
print('Method for minimization : '+method)
objective_function = partial(self.objectiveFunction, fitFunction=self.fitFunction,
x=self.x_data, y=self.y_data)
if self.evolutionParameters is not None:
minimizationBounds = [sorted(((1-margin)*p, (1+margin)*p)) for p in self.evolutionParameters]
minimizationParameters = minimize(objective_function,
x0=self.evolutionParameters,
bounds=minimizationBounds,
tol=1, #empiric value, documentation is cryptic
method=method,
options={'maxiter': self.N_resonators * 1000,
'maxfev': self.N_resonators * 1000,
'disp': False,
'adaptive': True}
)
else:
print('Differential Evolution algorithm not run, minimization only')
minimizationParameters = minimize(objective_function,
x0=np.mean(self.parameterBounds, axis=1),
bounds=self.parameterBounds,
method=method,
tol=1,
options={'maxiter': self.N_resonators * 5000,
'maxfev': self.N_resonators * 5000,
'disp': False,
'adaptive': True}
)
self.minimizationParameters = minimizationParameters.x
self.display_resonator_parameters(self.minimizationParameters)
[docs] def display_resonator_parameters(self, params=None, to_markdown=False):
"""
Displays resonance parameters in a formatted table using ASCII characters.
Args:
solution: A NumPy array of resonator parameters, typically shaped (n_resonators, 3).
"""
header_format = "{:^10}|{:^24}|{:^18}|{:^18}"
data_format = "{:^10d}|{:^24.2e}|{:^18.2f}|{:^18.3e}"
if to_markdown:
print("\n")
print("| Resonator | Rs [Ohm/m or Ohm] | Q | fres [Hz] |")
print("|-----------|------------------|---|-----------|")
for i, parameters in enumerate(params.reshape(-1, 3)):
print(f"| {i + 1} | {parameters[0]:.6g} | {parameters[1]:.6g} | {parameters[2]:.6g} |")
else:
print("\n")
print("-" * 70)
# Print header
print(header_format.format("Resonator", "Rs [Ohm/m or Ohm]", "Q", "fres [Hz]"))
print("-" * 70)
# Print data
for i, parameters in enumerate(params.reshape(-1,3)):
print(data_format.format(i + 1, *parameters))
print("-" * 70)
[docs] def get_wake(self, time_data=None, use_minimization=True):
# Check for time data
if time_data is None:
if self.time_data is None:
raise AttributeError("Provide time data array")
time_data = self.time_data
else:
if self.time_data is None:
self.time_data = time_data
# Which pars to use
if use_minimization and self.minimizationParameters is not None:
pars = self.minimizationParameters
else:
pars = self.evolutionParameters
# Which plane and formula
if self.plane == "longitudinal" and self.N_resonators > 1:
wake_data = wak.n_Resonator_longitudinal_wake(time_data, pars)
elif self.plane == "transverse" and self.N_resonators > 1:
wake_data = wak.n_Resonator_transverse_wake(time_data, pars)
elif self.plane == "longitudinal" and self.N_resonators == 1:
wake_data = wak.Resonator_longitudinal_wake(time_data, pars)
elif self.plane == "transverse" and self.N_resonators == 1:
wake_data = wak.Resonator_transverse_wake(time_data, pars)
if self.wake_data is None:
self.wake_data = wake_data
return wake_data
[docs] def get_wake_potential(self, time_data=None, sigma=1e-9, use_minimization=True):
# Check for time data
if time_data is None:
if self.time_data is None:
raise AttributeError("Provide time data array")
time_data = self.time_data
else:
if self.time_data is None:
self.time_data = time_data
# Which pars to use
if use_minimization and self.minimizationParameters is not None:
pars = self.minimizationParameters
else:
pars = self.evolutionParameters
# Which plane and formula
if self.plane == "longitudinal" and self.N_resonators > 1:
wake_potential_data = wak.n_Resonator_longitudinal_wake_potential(time_data, pars, sigma=sigma)
elif self.plane == "transverse" and self.N_resonators > 1:
wake_potential_data = wak.n_Resonator_transverse_wake_potential(time_data, pars, sigma=sigma)
elif self.plane == "longitudinal" and self.N_resonators == 1:
wake_potential_data = wak.Resonator_longitudinal_wake_potential(time_data, pars, sigma=sigma)
elif self.plane == "transverse" and self.N_resonators == 1:
wake_potential_data = wak.Resonator_transverse_wake_potential(time_data, pars, sigma=sigma)
return wake_potential_data
[docs] def get_impedance_from_fitFunction(self, frequency_data=None, use_minimization=True):
# Check for frequency data
if frequency_data is None:
if self.frequency_data is None:
raise AttributeError("Provide frequency data array")
frequency_data = self.frequency_data
else:
if self.frequency_data is None:
self.frequency_data = frequency_data
# Which pars to use
if use_minimization and self.minimizationParameters is not None:
pars = self.minimizationParameters
else:
pars = self.evolutionParameters
impedance_data = self.fitFunction(frequency_data, pars)
return impedance_data
[docs] def get_impedance(self, frequency_data=None,
use_minimization=True, wakelength=None):
# Check for frequency data
if frequency_data is None:
if self.frequency_data is None:
raise AttributeError("Provide frequency data array")
frequency_data = self.frequency_data
else:
if self.frequency_data is None:
self.frequency_data = frequency_data
# Which pars to use
if use_minimization and self.minimizationParameters is not None:
pars = self.minimizationParameters
else:
pars = self.evolutionParameters
# Which plane and formula
if self.plane == "longitudinal" and self.N_resonators > 1:
impedance_data = imp.n_Resonator_longitudinal_imp(frequency_data, pars, wakelength)
elif self.plane == "transverse" and self.N_resonators > 1:
impedance_data = imp.n_Resonator_transverse_imp(frequency_data, pars, wakelength)
elif self.plane == "longitudinal" and self.N_resonators == 1:
impedance_data = imp.Resonator_longitudinal_imp(frequency_data, pars, wakelength)
elif self.plane == "transverse" and self.N_resonators == 1:
impedance_data = imp.Resonator_transverse_imp(frequency_data, pars, wakelength)
return impedance_data
[docs] def get_impedance_from_fft(self, time_data=None, wake_data=None,
fmax=3e9, samples=1001):
# Check for time data
if time_data is None:
if self.time_data is None:
raise AttributeError("Provide time data array")
time_data = self.time_data
else:
if self.time_data is None:
self.time_data = time_data
wake_data = self.get_wake(self.time_data)
f, Z = compute_fft(data_time=time_data,
data_wake=wake_data,
fmax=fmax,
samples=samples)
if self.plane == 'transverse':
Z *= -1j
return f, Z
[docs] def compute_fft(self, data_time=None, data_wake=None, fmax=3e9, samples=1001):
# Check for time data - not override self
if time_data is None:
if self.time_data is None:
raise AttributeError("Provide time data array")
time_data = self.time_data
# Check for wake data - not override self
if wake_data is None:
if self.wake_data is None:
raise AttributeError("Provide wake data array")
wake_data = self.wake_data
compute_fft(data_time, data_wake, fmax, samples)