123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import torch
- import torch.nn as nn
- from dataclasses import dataclass
- import torch.optim as optim
- import numpy as np
- @dataclass
- class DeepKoopManConfig:
- state_dim: int
- latent_dim: int
- hidden_dim: int
- # seq_len: int
- class DeepKoopMan(nn.Module):
- def __init__(self, config: DeepKoopManConfig, encoder, decoder):
- super().__init__()
- self.config = config
- state_dim = config.state_dim
- latent_dim = config.latent_dim
- hidden_dim = config.hidden_dim
- # seq_len = config.seq_len
- self.state_dim = state_dim
- self.latent_dim = latent_dim
- # self.seq_len = seq_len
- self.encoder = encoder
- self.decoder = decoder
- self.lambda1 = 0.6
- # for param in self.encoder.parameters():
- # param.requires_grad = False
- # for param in self.decoder.parameters():
- # param.requires_grad = False
- # # 编码器 Encoder
- # self.encoder = nn.Sequential(
- # nn.Linear(state_dim, hidden_dim),
- # nn.ReLU(),
- # nn.Linear(hidden_dim, hidden_dim),
- # nn.ReLU(),
- # nn.Linear(hidden_dim, latent_dim)
- # )
- #
- # # 解码器 Decoder
- # self.decoder = nn.Sequential(
- # nn.Linear(latent_dim, hidden_dim),
- # nn.ReLU(),
- # nn.Linear(hidden_dim, hidden_dim),
- # nn.ReLU(),
- # nn.Linear(hidden_dim, state_dim)
- # )
- # Koopman算子A
- self.A = nn.Parameter(torch.randn(latent_dim, latent_dim))
- def forward(self, state):
- latent = self.encoder(state)
- dlatent_plus_dt = torch.matmul(latent, self.A)
- latent_next_pre = latent + dlatent_plus_dt
- state_next_pre = self.decoder(latent_next_pre)
- state_pre = self.decoder(latent)
- return state_pre, state_next_pre
- def total_loss_func(self, state, state_pre, state_next, state_next_pre):
- mseloss = nn.MSELoss()
- ae_loss = mseloss(state, state_pre)
- pre_loss = mseloss(state_next, state_next_pre)
- total_loss = pre_loss + ae_loss * self.lambda1
- return total_loss
- def deepkoopman_train(self, batch_size, epochs, lr, data_train, data_val):
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- self.to(device)
- data_train_input = data_train[:, :-1, :]
- data_train_output = data_train[:, 1:, :]
- data_val_input = data_val[:, :-1, :]
- data_val_output = data_val[:, 1:, :]
- data_train_input = data_train_input.to(device)
- data_train_output = data_train_output.to(device)
- data_val_input = data_val_input.to(device)
- data_val_output = data_val_output.to(device)
- train_dataset = torch.utils.data.TensorDataset(data_train_input, data_train_output)
- val_dataset = torch.utils.data.TensorDataset(data_val_input, data_val_output)
- train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
- val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
- op = optim.Adam(self.parameters(), lr=lr)
- TrainLoss = []
- ValLoss = []
- for epoch in range(epochs):
- train_loss = 0
- for X_batch, Y_batch in train_loader:
- X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)
- X_State_batch = X_batch[:, :, :self.state_dim]
- Y_State_batch = Y_batch[:, :, :self.state_dim]
- X_Pre_State_batch, Y_Pre_State_batch = self.forward(X_State_batch)
- loss = self.total_loss_func(X_State_batch, X_Pre_State_batch, Y_State_batch, Y_Pre_State_batch)
- train_loss += loss.item()
- op.zero_grad()
- loss.backward()
- op.step()
- val_loss = 0
- with torch.no_grad():
- for X_batch, Y_batch in val_loader:
- X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)
- X_State_batch = X_batch[:, :, :self.state_dim]
- Y_State_batch = Y_batch[:, :, :self.state_dim]
- X_Pre_State_batch, Y_Pre_State_batch = self.forward(X_State_batch)
- loss = self.total_loss_func(X_State_batch, X_Pre_State_batch, Y_State_batch, Y_Pre_State_batch)
- val_loss += loss.item()
- TrainLoss.append(train_loss / len(train_loader))
- ValLoss.append(val_loss / len(val_loader))
- train_log_loss = np.log10(train_loss / len(train_loader))
- val_log_loss = np.log10(val_loss / len(val_loader))
- print(
- f"LatentDim[{self.latent_dim}],"
- f"LR[{lr}], Epoch [{epoch + 1}/{epochs}],"
- f"Train Loss: {train_log_loss:.2f},"
- f"Validation Loss: {val_log_loss:.2f}")
- return TrainLoss, ValLoss
|