import torch import torch.nn as nn from dataclasses import dataclass import torch.optim as optim import numpy as np @dataclass class DeepKoopManConfig: state_dim: int latent_dim: int hidden_dim: int # seq_len: int class DeepKoopMan(nn.Module): def __init__(self, config: DeepKoopManConfig, encoder, decoder): super().__init__() self.config = config state_dim = config.state_dim latent_dim = config.latent_dim hidden_dim = config.hidden_dim # seq_len = config.seq_len self.state_dim = state_dim self.latent_dim = latent_dim # self.seq_len = seq_len self.encoder = encoder self.decoder = decoder self.lambda1 = 0.6 # for param in self.encoder.parameters(): # param.requires_grad = False # for param in self.decoder.parameters(): # param.requires_grad = False # # 编码器 Encoder # self.encoder = nn.Sequential( # nn.Linear(state_dim, hidden_dim), # nn.ReLU(), # nn.Linear(hidden_dim, hidden_dim), # nn.ReLU(), # nn.Linear(hidden_dim, latent_dim) # ) # # # 解码器 Decoder # self.decoder = nn.Sequential( # nn.Linear(latent_dim, hidden_dim), # nn.ReLU(), # nn.Linear(hidden_dim, hidden_dim), # nn.ReLU(), # nn.Linear(hidden_dim, state_dim) # ) # Koopman算子A self.A = nn.Parameter(torch.randn(latent_dim, latent_dim)) def forward(self, state): latent = self.encoder(state) dlatent_plus_dt = torch.matmul(latent, self.A) latent_next_pre = latent + dlatent_plus_dt state_next_pre = self.decoder(latent_next_pre) state_pre = self.decoder(latent) return state_pre, state_next_pre def total_loss_func(self, state, state_pre, state_next, state_next_pre): mseloss = nn.MSELoss() ae_loss = mseloss(state, state_pre) pre_loss = mseloss(state_next, state_next_pre) total_loss = pre_loss + ae_loss * self.lambda1 return total_loss def deepkoopman_train(self, batch_size, epochs, lr, data_train, data_val): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.to(device) data_train_input = data_train[:, :-1, :] data_train_output = data_train[:, 1:, :] data_val_input = data_val[:, :-1, :] data_val_output = data_val[:, 1:, :] data_train_input = data_train_input.to(device) data_train_output = data_train_output.to(device) data_val_input = data_val_input.to(device) data_val_output = data_val_output.to(device) train_dataset = torch.utils.data.TensorDataset(data_train_input, data_train_output) val_dataset = torch.utils.data.TensorDataset(data_val_input, data_val_output) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True) op = optim.Adam(self.parameters(), lr=lr) TrainLoss = [] ValLoss = [] for epoch in range(epochs): train_loss = 0 for X_batch, Y_batch in train_loader: X_batch, Y_batch = X_batch.to(device), Y_batch.to(device) X_State_batch = X_batch[:, :, :self.state_dim] Y_State_batch = Y_batch[:, :, :self.state_dim] X_Pre_State_batch, Y_Pre_State_batch = self.forward(X_State_batch) loss = self.total_loss_func(X_State_batch, X_Pre_State_batch, Y_State_batch, Y_Pre_State_batch) train_loss += loss.item() op.zero_grad() loss.backward() op.step() val_loss = 0 with torch.no_grad(): for X_batch, Y_batch in val_loader: X_batch, Y_batch = X_batch.to(device), Y_batch.to(device) X_State_batch = X_batch[:, :, :self.state_dim] Y_State_batch = Y_batch[:, :, :self.state_dim] X_Pre_State_batch, Y_Pre_State_batch = self.forward(X_State_batch) loss = self.total_loss_func(X_State_batch, X_Pre_State_batch, Y_State_batch, Y_Pre_State_batch) val_loss += loss.item() TrainLoss.append(train_loss / len(train_loader)) ValLoss.append(val_loss / len(val_loader)) train_log_loss = np.log10(train_loss / len(train_loader)) val_log_loss = np.log10(val_loss / len(val_loader)) print( f"LatentDim[{self.latent_dim}]," f"LR[{lr}], Epoch [{epoch + 1}/{epochs}]," f"Train Loss: {train_log_loss:.2f}," f"Validation Loss: {val_log_loss:.2f}") return TrainLoss, ValLoss