import reservoirpy as rpy import torch from numpy.linalg import eigvals from reservoirpy.nodes import Reservoir, Ridge import numpy as np from dataclasses import dataclass @dataclass class KoopmanESNConfig: units: int lr: float sr: float sp: float ridge: float train_start: int train_len: int train_warmup: int predict_warmup: int predict_len: int state_dim: int class KoopmanESN: def __init__(self, A, config: KoopmanESNConfig): super().__init__() self.esn_model = None self.config = config self.A = A def W_initial(self, A): # W_res = np.random.randn(self.config.units, self.config.units) # mask = np.random.rand(self.config.units, self.config.units) > self.config.sp # W_res[mask] = 0 # W_res = self.config.sr * (W_res / np.max(np.abs(eigvals(W_res)))) # 归一化谱半径 # 2. 生成对齐矩阵U和V(通过SVD或随机投影) # U = np.random.randn(self.config.units, self.config.state_dim) # V = np.random.randn(self.config.units, self.config.state_dim) # # W_koop = U @ A @ V.T W_koop = A # # SVD分解 # U, S, Vt = np.linalg.svd(A) # # # 生成随机投影矩阵,但与SVD的主方向对齐 # P = np.random.randn(self.config.units, U.shape[0]) # (100,5) # Q = np.random.randn(self.config.units, Vt.shape[1]) # (100,5) # # U_aligned = P @ U # 随机旋转后的U # V_aligned = Q @ Vt.T # 随机旋转后的V # # # 构造耦合项 # W_koop = U_aligned @ np.diag(S) @ V_aligned.T # (100,100) # 4. 耦合W_res和W_koop # lambda_coupling = 0.1 # W = W_res + lambda_coupling * W_koop W = W_koop # 5. 调整谱半径 current_radius = np.max(np.abs(eigvals(W))) W = (self.config.sr / current_radius) * W return W def koopmanesn_train(self, data_train): rpy.verbosity(0) rpy.set_seed(42) W = self.W_initial(self.A) Win = np.eye(self.config.units, self.config.units) reservoir = Reservoir( units=self.config.units, lr=self.config.lr, sr=self.config.sr, W=W, input_scaling=1.0, Win=Win, activation='identity' ) readout = Ridge(ridge=self.config.ridge) esn_model = reservoir >> readout data_train_in = data_train[:-1, :] data_train_out = data_train[1:, :] self.esn_model = esn_model.fit( data_train_in, data_train_out, warmup=self.config.train_warmup ) def predict(self, data_warm): warmup_data = self.esn_model.run(data_warm, reset=True) x = warmup_data[-1, :].reshape(1, -1) data_pre = np.empty((self.config.predict_len, self.config.state_dim)) for i in range(self.config.predict_len): data_pre[i, :] = x x = self.esn_model(x) return data_pre