123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import reservoirpy as rpy
- import torch
- from numpy.linalg import eigvals
- from reservoirpy.nodes import Reservoir, Ridge
- import numpy as np
- from dataclasses import dataclass
- @dataclass
- class KoopmanESNConfig:
- units: int
- lr: float
- sr: float
- sp: float
- ridge: float
- train_start: int
- train_len: int
- train_warmup: int
- predict_warmup: int
- predict_len: int
- state_dim: int
- class KoopmanESN:
- def __init__(self, A, config: KoopmanESNConfig):
- super().__init__()
- self.esn_model = None
- self.config = config
- self.A = A
- def W_initial(self, A):
- # W_res = np.random.randn(self.config.units, self.config.units)
- # mask = np.random.rand(self.config.units, self.config.units) > self.config.sp
- # W_res[mask] = 0
- # W_res = self.config.sr * (W_res / np.max(np.abs(eigvals(W_res)))) # 归一化谱半径
- # 2. 生成对齐矩阵U和V(通过SVD或随机投影)
- # U = np.random.randn(self.config.units, self.config.state_dim)
- # V = np.random.randn(self.config.units, self.config.state_dim)
- #
- # W_koop = U @ A @ V.T
- W_koop = A
- # # SVD分解
- # U, S, Vt = np.linalg.svd(A)
- #
- # # 生成随机投影矩阵,但与SVD的主方向对齐
- # P = np.random.randn(self.config.units, U.shape[0]) # (100,5)
- # Q = np.random.randn(self.config.units, Vt.shape[1]) # (100,5)
- #
- # U_aligned = P @ U # 随机旋转后的U
- # V_aligned = Q @ Vt.T # 随机旋转后的V
- #
- # # 构造耦合项
- # W_koop = U_aligned @ np.diag(S) @ V_aligned.T # (100,100)
- # 4. 耦合W_res和W_koop
- # lambda_coupling = 0.1
- # W = W_res + lambda_coupling * W_koop
- W = W_koop
- # 5. 调整谱半径
- current_radius = np.max(np.abs(eigvals(W)))
- W = (self.config.sr / current_radius) * W
- return W
- def koopmanesn_train(self, data_train):
- rpy.verbosity(0)
- rpy.set_seed(42)
- W = self.W_initial(self.A)
- Win = np.eye(self.config.units, self.config.units)
- reservoir = Reservoir(
- units=self.config.units,
- lr=self.config.lr,
- sr=self.config.sr,
- W=W,
- input_scaling=1.0,
- Win=Win
- )
- readout = Ridge(ridge=self.config.ridge)
- esn_model = reservoir >> readout
- data_train_in = data_train[:-1, :]
- data_train_out = data_train[1:, :]
- self.esn_model = esn_model.fit(
- data_train_in,
- data_train_out,
- warmup=self.config.train_warmup
- )
- def predict(self, data_warm):
- warmup_data = self.esn_model.run(data_warm, reset=True)
- x = warmup_data[-1, :].reshape(1, -1)
- data_pre = np.empty((self.config.predict_len, self.config.state_dim))
- for i in range(self.config.predict_len):
- data_pre[i, :] = x
- x = self.esn_model(x)
- return data_pre
|