PyTorch 自编码器 (Autoencoder)
自编码器(Autoencoder,AE)是一种无监督学习的神经网络,通过学习将输入数据压缩到低维潜在空间,再从压缩表示重构原始数据。
自编码器广泛应用于数据降维、特征提取、异常检测、图像去噪、生成模型等场景。
1. 自编码器基础原理
自编码器的基本结构包含三个部分:
- 编码器(Encoder):将输入数据 \(x\) 映射到低维潜在表示 \(z\)
- 潜在空间(Latent Space):编码器输出的低维向量,也称为瓶颈层
- 解码器(Decoder):将潜在表示 \(z\) 重构为输出 \(\hat{x}\)
1.1 网络结构
自编码器的目标是让输出 \(\hat{x}\) 尽可能接近输入 \(x\):
\[ \min_{\theta, \phi} \frac{1}{n} \sum_{i=1}^{n} \| x_i - D_\phi(E_\theta(x_i)) \|^2 \]
其中 \(\theta\) 是编码器参数,\(\phi\) 是解码器参数。
1.2 降维效果
自编码器通过强制数据通过比输入维度更小的瓶颈层,从而学习数据的压缩表示。这种压缩保留了数据的主要信息。
与主成分分析(PCA)相比,自编码器可以学习非线性降维,能够捕捉更复杂的数据结构。
2. 基础自编码器实现
2.1 简单自编码器
实例
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# ── 自编码器模型 ─────────────────────────────────
class Autoencoder(nn.Module):
"""
基础自编码器:对称结构
"""
def __init__(self, input_dim, hidden_dim, latent_dim):
super().__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, latent_dim), # 瓶颈层
)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
)
def forward(self, x):
z = self.encoder(x)
x_recon = self.decoder(z)
return x_recon
def encode(self, x):
"""编码:获取潜在表示"""
return self.encoder(x)
def decode(self, z):
"""解码:从潜在表示重构"""
return self.decoder(z)
# ── 使用示例 ──────────────────────────────────────
INPUT_DIM = 784 # 例如 MNIST 图像展开后
HIDDEN_DIM = 256
LATENT_DIM = 32 # 潜在空间维度,远小于输入维度
model = Autoencoder(INPUT_DIM, HIDDEN_DIM, LATENT_DIM)
print(f"输入维度: {INPUT_DIM}")
print(f"潜在维度: {LATENT_DIM}")
print(f"压缩比: {INPUT_DIM / LATENT_DIM:.1f}x")
# 查看参数量
total_params = sum(p.numel() for p in model.parameters())
print(f"总参数量: {total_params:,}")
2.2 卷积自编码器
对于图像数据,使用卷积层的自编码器效果更好:
实例
import torch.nn as nn
class ConvAutoencoder(nn.Module):
"""
卷积自编码器:适用于图像
"""
def __init__(self, channels=3, latent_dim=128):
super().__init__()
# 编码器:逐步减小尺寸,增加通道数
# 输入: (batch, channels, 64, 64)
self.encoder = nn.Sequential(
# 32 -> 16
nn.Conv2d(channels, 32, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
# 16 -> 8
nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
# 8 -> 4
nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
# 4 -> 2
nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
)
# 潜在空间映射
self.to_latent = nn.AdaptiveAvgPool2d((1, 1))
# 解码器:逐步增大尺寸
# 输入: (batch, 256, 2, 2)
self.from_latent = nn.Sequential(
nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
nn.Conv2d(256, 128, kernel_size=3, padding=1),
nn.ReLU(),
nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
nn.Conv2d(128, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
nn.Conv2d(64, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
nn.Conv2d(32, channels, kernel_size=3, padding=1),
nn.Sigmoid() # 输出 [0, 1]
)
def forward(self, x):
z = self.encode(x)
x_recon = self.decode(z)
return x_recon
def encode(self, x):
"""编码"""
features = self.encoder(x)
z = self.to_latent(features)
z = z.view(z.size(0), -1) # (batch, 256)
return z
def decode(self, z):
"""解码"""
# 将向量 reshape 为特征图
batch_size = z.size(0)
z = z.view(batch_size, 256, 1, 1)
z = z.expand(-1, -1, 2, 2) # 上采样到 2x2
x_recon = self.from_latent(z)
return x_recon
# 测试
model = ConvAutoencoder(channels=3, latent_dim=128)
x = torch.randn(4, 3, 64, 64)
x_recon = model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {x_recon.shape}")
print(f"潜在向量形状: {model.encode(x).shape}")
2.3 训练与重构
实例
import torch.nn as nn
import torch.optim as optim
# ── 训练配置 ─────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ConvAutoencoder(channels=3, latent_dim=128).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
# ── 训练循环 ─────────────────────────────────────
def train_autoencoder(model, dataloader, criterion, optimizer, num_epochs=10):
model.train()
for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
images = batch[0].to(device)
# 前向传播
outputs = model(images)
loss = criterion(outputs, images)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(dataloader)
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.6f}")
return model
# 假设已有数据加载器
# train_autoencoder(model, train_loader, criterion, optimizer, num_epochs=10)
print("自编码器训练完成!")
3. 去噪自编码器 (DAE)
去噪自编码器(Denoising Autoencoder,DAE)在训练时给输入添加噪声,然后学习去除噪声恢复原始输入。这使模型学到更鲁棒的特征表示。
3.1 去噪自编码器实现
实例
import torch.nn as nn
class DenoisingAutoencoder(nn.Module):
"""
去噪自编码器
"""
def __init__(self, input_dim, hidden_dim, latent_dim):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, latent_dim),
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
nn.Sigmoid() # 输出 [0, 1]
)
def forward(self, x):
z = self.encode(x)
return self.decode(z)
def encode(self, x):
return self.encoder(x)
def decode(self, z):
return self.decoder(z)
def add_noise(x, noise_factor=0.3):
"""
添加高斯噪声
"""
noise = torch.randn_like(x) * noise_factor
noisy_x = x + noise
return torch.clamp(noisy_x, 0.0, 1.0)
# 训练去噪自编码器
def train_dae(model, dataloader, noise_factor=0.3, lr=1e-3):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
model.train()
for epoch in range(10):
for batch in dataloader:
images = batch[0]
# 添加噪声
noisy_images = add_noise(images, noise_factor)
noisy_images = noisy_images.to(next(model.parameters()).device)
images = images.to(next(model.parameters()).device)
# 前向传播
outputs = model(noisy_images)
loss = criterion(outputs, images) # 与原始图像比较,而非噪声图像
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
return model
3.2 其他噪声类型
实例
"""盐椒噪声"""
random_mask = torch.rand_like(x)
noisy = x.clone()
noisy[random_mask < prob / 2] = 0.0
noisy[random_mask > 1 - prob / 2] = 1.0
return noisy
def mask_noise(x, prob=0.1):
"""遮挡噪声(随机置零)"""
mask = torch.rand_like(x) > prob
return x * mask.float()
def dropout_noise(x, rate=0.2):
"""Dropout 噪声"""
mask = torch.rand_like(x) > rate
return x * mask.float() / (1 - rate)
4. 变分自编码器 (VAE)
变分自编码器(Variational Autoencoder,VAE)是一种生成模型,它将数据编码为潜在空间中的概率分布,而非固定向量。这使得我们可以从潜在空间中采样生成新数据。
4.1 VAE 核心原理
VAE 的关键创新是学习潜在变量的概率分布:
- 编码器输出均值 \(\mu\) 和标准差 \(\sigma\)
- 从正态分布 \(\mathcal{N}(\mu, \sigma)\) 中采样得到潜在向量 \(z\)
- 解码器从 \(z\) 重构数据
为了实现可微的采样过程,使用了重参数化技巧(Reparameterization Trick):
\[ z = \mu + \sigma \cdot \epsilon, \quad \epsilon \sim \mathcal{N}(0, 1) \]
4.2 VAE 实现
实例
import torch.nn as nn
import torch.optim as optim
class VAE(nn.Module):
"""
变分自编码器
"""
def __init__(self, input_dim, hidden_dim, latent_dim):
super().__init__()
# 编码器:输出均值和方差
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
)
self.fc_mu = nn.Linear(hidden_dim, latent_dim)
self.fc_logvar = nn.Linear(hidden_dim, latent_dim)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
nn.Sigmoid()
)
def encode(self, x):
h = self.encoder(x)
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def reparameterize(self, mu, logvar):
"""重参数化技巧"""
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z):
return self.decoder(z)
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
x_recon = self.decode(z)
return x_recon, mu, logvar
def vae_loss(x_recon, x, mu, logvar, beta=1.0):
"""
VAE 损失函数
重构损失 + KL 散度
"""
# 重构损失
recon_loss = nn.functional.mse_loss(x_recon, x, reduction='sum')
# KL 散度:-0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return recon_loss + beta * kl_loss, recon_loss, kl_loss
# 使用示例
INPUT_DIM = 784
HIDDEN_DIM = 256
LATENT_DIM = 2 # 二维潜在空间便于可视化
model = VAE(INPUT_DIM, HIDDEN_DIM, LATENT_DIM)
# 测试
x = torch.randn(32, 784)
x_recon, mu, logvar = model(x)
print(f"输入形状: {x.shape}")
print(f"重构形状: {x_recon.shape}")
print(f"均值形状: {mu.shape}") # (32, 2)
print(f"方差形状: {logvar.shape}") # (32, 2)
4.3 VAE 生成与可视化
实例
def visualize_latent_space(model, dataloader, device):
"""可视化潜在空间"""
model.eval()
all_mu = []
all_labels = []
with torch.no_grad():
for batch in dataloader:
images, labels = batch[0].to(device), batch[1]
mu, _ = model.encode(images)
all_mu.append(mu.cpu())
all_labels.append(labels)
all_mu = torch.cat(all_mu, dim=0).numpy()
all_labels = torch.cat(all_labels, dim=0).numpy()
plt.figure(figsize=(10, 8))
scatter = plt.scatter(all_mu[:, 0], all_mu[:, 1], c=all_labels,
cmap='tab10', alpha=0.5, s=10)
plt.colorbar(scatter)
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('VAE Latent Space')
plt.show()
def generate_from_latent(model, z, device):
"""从潜在向量生成图像"""
model.eval()
with torch.no_grad():
z = z.to(device)
generated = model.decode(z)
return generated
def interpolate_latent(model, z1, z2, steps=10, device):
"""潜在空间插值生成"""
model.eval()
# 线性插值
alphas = torch.linspace(0, 1, steps)
interpolated = []
with torch.no_grad():
for alpha in alphas:
z = z1 * (1 - alpha) + z2 * alpha
generated = model.decode(z)
interpolated.append(generated)
return torch.cat(interpolated, dim=0)
# 生成新图像
def generate_new_images(model, num_images, latent_dim, device):
"""从随机潜在向量生成新图像"""
model.eval()
with torch.no_grad():
# 从标准正态分布采样
z = torch.randn(num_images, latent_dim).to(device)
generated = model.decode(z)
return generated
VAE 的潜在空间是连续的,可以在潜在空间中进行插值,生成平滑过渡的图像。但 VAE 生成的图像通常较模糊,这是因为它优化的是下界而非精确的对数似然。
5. 稀疏自编码器
稀疏自编码器(Sparse Autoencoder)在损失函数中加入稀疏性约束,限制潜在向量的激活数量。这使模型能够学习更有意义的特征。
5.1 稀疏自编码器实现
import torch.nn as nn
import torch.nn.functional as F
class SparseAutoencoder(nn.Module):
"""
稀疏自编码器
"""
def __init__(self, input_dim, hidden_dim, latent_dim):
super().__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
)
# 潜在层
self.bottleneck = nn.Linear(hidden_dim, latent_dim)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
nn.Sigmoid()
)
def forward(self, x):
h = self.encoder(x)
z = self.bottleneck(h)
z_activated = F.relu(z) # 稀疏激活
x_recon = self.decoder(z_activated)
return x_recon, z_activated
def sparse_loss(z, rho=0.05, beta=1.0):
"""
稀疏损失:KL 散度
rho: 目标稀疏度(例如 0.05 表示只有 5% 的神经元应该激活)
beta: 稀疏项权重
"""
# 计算平均激活
rho_hat = torch.mean(z, dim=0)
# KL 散度
kl = rho * torch.log(rho / (rho_hat + 1e-8)) + \
(1 - rho) * torch.log((1 - rho) / (1 - rho_hat + 1e-8))
return beta * torch.sum(kl)
def total_sparse_loss(x_recon, x, z, rho=0.05, beta=1.0):
"""总损失 = 重构损失 + 稀疏损失"""
recon_loss = F.mse_loss(x_recon, x)
sparsity = sparse_loss(z, rho, beta)
return recon_loss + sparsity
6. 序列到序列自编码器
对于序列数据(如文本、时间序列),使用 RNN/LSTM 作为编码器和解码器。
6.1 序列自编码器实现
实例
import torch.nn as nn
class Seq2SeqAutoencoder(nn.Module):
"""
序列到序列自编码器:用于序列数据
"""
def __init__(self, input_size, hidden_size, latent_size, num_layers=2):
super().__init__()
# 编码器 LSTM
self.encoder = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True
)
# 潜在空间映射
# 双向 LSTM 输出是 hidden_size * 2
self.to_latent = nn.Linear(hidden_size * 2, latent_size)
self.from_latent = nn.Linear(latent_size, hidden_size * 2)
# 解码器 LSTM
self.decoder = nn.LSTM(
input_size=hidden_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True
)
# 输出映射
self.output_proj = nn.Linear(hidden_size * 2, input_size)
def forward(self, x):
# 编码
_, (h_n, _) = self.encoder(x)
# 合并双向最后隐藏状态
# h_n: (num_layers * 2, batch, hidden_size)
h_forward = h_n[-2]
h_backward = h_n[-1]
h_combined = torch.cat([h_forward, h_backward], dim=-1)
# 映射到潜在空间
z = self.to_latent(h_combined)
# 从潜在空间映射回来
decoder_init = self.from_latent(z)
decoder_init = decoder_init.view(2, decoder_init.size(0), -1) # (2, batch, hidden_size*2)
# 解码(使用原始输入长度)
decoder_output, _ = self.decoder(
x,
(decoder_init, torch.zeros_like(decoder_init))
)
# 输出映射
output = self.output_proj(decoder_output)
return output, z
# 使用示例
model = Seq2SeqAutoencoder(
input_size=128, # 输入特征维度
hidden_size=256, # LSTM 隐藏维度
latent_size=64, # 潜在空间维度
num_layers=2
)
# 测试
x = torch.randn(8, 20, 128) # (batch, seq_len, input_size)
output, z = model(x)
print(f"输入形状: {x.shape}") # (8, 20, 128)
print(f"输出形状: {output.shape}") # (8, 20, 128)
print(f"潜在向量形状: {z.shape}") # (8, 64)
7. 自编码器的应用场景
7.1 异常检测
自编码器可以用于检测异常数据。正常数据的重构误差小,异常数据的重构误差大:
实例
import torch.nn as nn
def detect_anomalies(model, data_loader, threshold=None, device='cpu'):
"""
使用自编码器检测异常
"""
model.eval()
reconstruction_errors = []
with torch.no_grad():
for batch in data_loader:
images = batch[0].to(device)
outputs = model(images)
# 计算重构误差(均方误差)
errors = torch.mean((outputs - images) ** 2, dim=(1, 2, 3))
reconstruction_errors.extend(errors.cpu().numpy())
reconstruction_errors = torch.tensor(reconstruction_errors)
# 如果没有给定阈值,使用统计方法
if threshold is None:
# 使用 95% 分位数
threshold = torch.quantile(reconstruction_errors, 0.95).item()
# 标记异常
anomalies = reconstruction_errors > threshold
return anomalies, reconstruction_errors, threshold
# 训练异常检测模型
def train_anomaly_detector(normal_data_loader):
"""只用正常数据训练自编码器"""
model = ConvAutoencoder(channels=1, latent_dim=32)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
model.train()
for epoch in range(10):
for batch in normal_data_loader:
images = batch[0]
outputs = model(images)
loss = criterion(outputs, images)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return model
7.2 图像着色与风格迁移
实例
"""
图像着色自编码器
输入:灰度图像 (batch, 1, H, W)
输出:彩色图像 (batch, 2, H, W) (ab 色彩空间)
"""
def __init__(self):
super().__init__()
# 编码器:逐步提取特征
self.encoder = nn.Sequential(
nn.Conv2d(1, 64, kernel_size=4, stride=2, padding=1), # H/2
nn.ReLU(),
nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1), # H/4
nn.BatchNorm2d(128),
nn.ReLU(),
nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1), # H/8
nn.BatchNorm2d(256),
nn.ReLU(),
)
# 解码器:上采样生成颜色
self.decoder = nn.Sequential(
nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 2, kernel_size=3, padding=1), # 输出 ab 通道
nn.Sigmoid() # ab 通道范围 [0, 1]
)
def forward(self, x):
z = self.encoder(x)
color = self.decoder(z)
return color
7.3 数据降维可视化
实例
def visualize_latent_2d(model, dataloader, device, num_samples=1000):
"""
使用自编码器将数据降到 2 维进行可视化
"""
model.eval()
all_latents = []
all_labels = []
with torch.no_grad():
count = 0
for batch in dataloader:
if count >= num_samples:
break
images, labels = batch[0], batch[1]
images = images.to(device)
# 如果是 2D AE,直接使用
# 如果维度更高,需要先投影到 2D
z = model.encode(images)
all_latents.append(z.cpu())
all_labels.append(labels)
count += images.size(0)
latents = torch.cat(all_latents, dim=0)[:num_samples].numpy()
labels = torch.cat(all_labels, dim=0)[:num_samples].numpy()
plt.figure(figsize=(10, 8))
scatter = plt.scatter(latents[:, 0], latents[:, 1], c=labels,
cmap='tab10', alpha=0.6, s=20)
plt.colorbar(scatter)
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('Autoencoder 2D Latent Space Visualization')
plt.show()
8. API 快速参考
8.1 常见自编码器类型
| 类型 | 特点 | 适用场景 |
|---|---|---|
| 基础自编码器 | 简单对称结构 | 降维、特征提取 |
| 卷积自编码器 | 使用卷积层保留空间结构 | 图像处理 |
| 去噪自编码器 | 学习去除噪声 | 图像去噪、鲁棒特征 |
| 变分自编码器 | 学习概率分布,可生成新数据 | 生成模型、数据生成 |
| 稀疏自编码器 | 稀疏约束,学习可解释特征 | 特征解耦、可解释性 |
| 序列自编码器 | 使用 RNN/LSTM 处理序列 | 文本、时间序列 |
8.2 损失函数选择
| 任务 | 推荐损失函数 |
|---|---|
| 图像重构 | MSELoss、SSIMLoss |
| 二值图像 | BCELoss、BCEWithLogitsLoss |
| 文本重构 | CrossEntropyLoss |
| VAE | MSE + KL Divergence |
| 异常检测 | MSE、MAE |
8.3 潜在维度选择
数据维度低(<100维)
-> 潜在维度设为 2~10
数据维度中等(100~1000维)
-> 潜在维度设为 10~50
数据维度高(>1000维)
-> 潜在维度设为 50~200
生成任务(VAE)
-> 潜在维度 2~32(便于采样和可视化)
异常检测
-> 潜在维度 16~64(保留足够信息检测异常)
