UNetPlusPlus 圖像分割代碼分析#
訓練代碼與解釋#
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from my_dataset import ImageSegmentationDataset # 自定義數據集
from NestedUNet import NestedUNet # 模型定義文件
# 定義超參數
batch_size = 1
learning_rate = 1e-4
num_epochs = 200
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# 計算新尺寸,將原始尺寸除以 2
new_height = 2048 // 2
new_width = 3072 // 2
# 數據預處理和數據增強
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 將圖像大小調整為原始尺寸的一半
transforms.ToTensor() # 轉換為 PyTorch 張量
])
# 加載數據
train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
mask_dir='./dataset/train/masks',
transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# 初始化模型、損失函數、優化器
model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 訓練循環
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
for images, masks in train_loader:
images, masks = images.to(device), masks.to(device)
# 確保目標張量的形狀為 [batch_size, height, width]
masks = torch.squeeze(masks, dim=1) # 去除通道維度
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, masks)
loss.backward()
optimizer.step()
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
# 保存訓練好的模型
torch.save(model.state_dict(), './model.pth')
1. 數據預處理和加載#
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 將圖像大小調整為原始尺寸的一半
transforms.ToTensor() # 轉換為 PyTorch 張量
])
- Resize: 將圖像和掩碼調整為新的尺寸
(new_height, new_width)
,這裡是對原始尺寸(2048, 3072)
進行縮小。 - ToTensor: 將圖像和掩碼轉換為 PyTorch 張量,並將像素值歸一化到 [0, 1] 範圍。
train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
mask_dir='./dataset/train/masks',
transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
- ImageSegmentationDataset: 自定義的數據集類,負責加載圖像和對應的掩碼。
- DataLoader: 將數據集包裝成可迭代的 DataLoader,設置 batch size 和 shuffle。
2. 模型、損失函數和優化器的初始化#
model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
- NestedUNet: 自定義的神經網絡模型,用於圖像分割,輸入通道數為 3(RGB 圖像),輸出類別數為 2。
- CrossEntropyLoss: 適用於多類分類任務的損失函數,常用於圖像分割。
- Adam 優化器: 用於更新網絡參數。
3. 訓練循環#
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
for images, masks in train_loader:
images, masks = images.to(device), masks.to(device)
# 確保目標張量的形狀為 [batch_size, height, width]
masks = torch.squeeze(masks, dim=1) # 去除通道維度
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, masks)
loss.backward()
optimizer.step()
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
- model.train(): 將模型設置為訓練模式,啟用 dropout 和 batch normalization。
- images, masks = images.to(device), masks.to(device): 將數據轉移到 GPU 或 CPU。
- masks = torch.squeeze(masks, dim=1): 這是關鍵一步,解釋如下。
4. 關於通道處理的詳細解釋#
在圖像分割任務中:
- 輸入圖像通常是三維的,形狀為
[batch_size, channels, height, width]
,例如[1, 3, 1024, 1536]
。 - ** 掩碼(mask)** 通常是四維的,但通道數為 1,形狀為
[batch_size, 1, height, width]
,例如[1, 1, 1024, 1536]
。
然而,nn.CrossEntropyLoss
函數要求目標掩碼的形狀為 [batch_size, height, width]
,即不包含通道維度。
因此,需要使用 torch.squeeze
函數去除掩碼的通道維度:
masks = torch.squeeze(masks, dim=1)
這將掩碼的形狀從 [batch_size, 1, height, width]
變為 [batch_size, height, width]
,滿足損失函數的要求。
5. 模型輸出與損失計算#
- outputs = model(images): 模型輸出形狀為
[batch_size, num_classes, height, width]
,例如[1, 2, 1024, 1536]
。 - loss = criterion(outputs, masks): 計算預測結果與真實掩碼之間的交叉熵損失。
6. 模型保存#
torch.save(model.state_dict(), './model.pth')
- 保存模型的參數到文件
model.pth
,方便後續加載和推理。
這段代碼的主要功能是加載一個預訓練的 NestedUNet 模型,使用它對指定目錄下的圖像進行分割,並將結果保存到輸出目錄。代碼的執行流程如下:
推理代碼與解釋#
import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms
from NestedUNet import NestedUNet # 模型定義文件
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# 加載模型
def load_model(model, path):
if not os.path.exists(path):
raise FileNotFoundError(f"Model file not found: {path}")
model.load_state_dict(torch.load(path, map_location=device))
model.eval()
return model
# 進行推理
def segment_images(model, image_dir, output_dir):
# 計算新尺寸,將原始尺寸除以 2
new_height = 2048 // 2
new_width = 3072 // 2
# 數據預處理和數據增強
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 將圖像大小調整為原始尺寸的一半
transforms.ToTensor() # 轉換為 PyTorch 張量
])
os.makedirs(output_dir, exist_ok=True)
for filename in os.listdir(image_dir):
if filename.endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(image_dir, filename)
image = Image.open(filepath).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device) # 增加批量維度
with torch.no_grad():
outputs = model(input_tensor)
prediction = torch.argmax(outputs, dim=1).squeeze(0) # 獲取分割結果
# 保存分割結果
output_filename = filename.split('.')[0] + '_segmentation.png'
output_path = os.path.join(output_dir, output_filename)
# 將類別值映射到 0-255 範圍
pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
Image.fromarray(pred_img).save(output_path)
# 主執行代碼
if __name__ == "__main__":
model = NestedUNet(num_classes=2, input_channels=3).to(device)
model = load_model(model, './model.pth') # 加載預訓練模型
# 定義輸入目錄和輸出目錄
input_dirs = [
'./dataset/1-2000',
'./dataset/2001-4000',
'./dataset/4001-6000',
'./dataset/6001-8000',
'./dataset/8001-9663'
]
base_output_dir = './dataset/segmentation_results' # 基礎輸出結果目錄
for input_dir in input_dirs:
output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
segment_images(model, input_dir, output_dir)
print(f"Segmentation results saved to: {base_output_dir}")
1. 設備選擇#
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
- 根據機器是否有可用的 GPU(通過
torch.cuda.is_available()
檢查)來選擇計算設備。如果有 GPU 可用,代碼會選擇 GPU,否則使用 CPU。
2. 加載模型#
def load_model(model, path):
if not os.path.exists(path):
raise FileNotFoundError(f"Model file not found: {path}")
model.load_state_dict(torch.load(path, map_location=device))
model.eval()
return model
load_model
函數:- 檢查路徑中是否有模型文件。
- 使用
torch.load()
加載預訓練的模型參數。 - 加載後,調用
model.eval()
將模型設置為評估模式(禁用 dropout 等操作)。 - 該函數返回加載完權重的模型。
3. 進行圖像分割推理#
def segment_images(model, image_dir, output_dir):
new_height = 2048 // 2
new_width = 3072 // 2
- 設置目標圖像的大小,即將原始圖像的高和寬各縮小一半(
2048 // 2
和3072 // 2
)。
數據預處理#
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 調整圖像大小
transforms.ToTensor() # 轉換為 PyTorch 張量
])
- 圖像通過
Resize
變換調整為新尺寸。 - 然後通過
ToTensor()
轉換為 PyTorch 張量格式,方便輸入到模型。
處理每張圖像#
for filename in os.listdir(image_dir):
if filename.endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(image_dir, filename)
image = Image.open(filepath).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device) # 增加批量維度
- 遍歷
image_dir
目錄中的所有圖像文件(支持.png
、.jpg
和.jpeg
格式)。 - 對每張圖像進行讀取並轉換為 RGB 模式(即使是灰度圖也會被處理為 RGB 圖像)。
- 使用預處理
transform
轉換為張量,並添加一個批量維度(unsqueeze(0)
),使形狀變為[1, C, H, W]
(適配模型輸入)。
模型推理#
with torch.no_grad():
outputs = model(input_tensor)
prediction = torch.argmax(outputs, dim=1).squeeze(0) # 獲取分割結果
- 使用
torch.no_grad()
禁用梯度計算,節省內存和加速推理。 model(input_tensor)
會返回模型的輸出(每個像素的類別概率)。torch.argmax(outputs, dim=1)
:對每個像素,取類別概率最大的一項作為預測類別。squeeze(0)
:去除批量維度,得到的prediction
形狀為[H, W]
。
保存分割結果#
output_filename = filename.split('.')[0] + '_segmentation.png'
output_path = os.path.join(output_dir, output_filename)
pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
Image.fromarray(pred_img).save(output_path)
output_filename
:為每個輸出圖像文件命名,格式為原始文件名加_segmentation.png
。prediction.cpu().numpy()
:將預測結果從 GPU 移到 CPU,並轉換為 NumPy 陣列。astype(np.uint8) * 255
:將預測類別(0 或 1)映射到灰度值(0 或 255),使得結果可以保存為黑白圖片。- 使用 Pillow 將
pred_img
保存為 PNG 格式。
4. 主執行代碼#
if __name__ == "__main__":
model = NestedUNet(num_classes=2, input_channels=3).to(device)
model = load_model(model, './model.pth') # 加載預訓練模型
input_dirs = [
'./dataset/1-2000',
'./dataset/2001-4000',
'./dataset/4001-6000',
'./dataset/6001-8000',
'./dataset/8001-9663'
]
base_output_dir = './dataset/segmentation_results' # 基礎輸出結果目錄
for input_dir in input_dirs:
output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
segment_images(model, input_dir, output_dir)
print(f"Segmentation results saved to: {base_output_dir}")
- 在主程序中,首先加載
NestedUNet
模型並加載權重。 - 定義一個包含多個子文件夾路徑(
input_dirs
)的列表,每個文件夾包含待分割的圖像。 - 為每個輸入文件夾生成對應的輸出文件夾,將分割結果保存到這些輸出文件夾。
- 最後輸出保存結果的目錄路徑。
數據預處理代碼與解釋#
import os
import numpy as np
import torch
from PIL import Image
class ImageSegmentationDataset:
def __init__(self, image_dir, mask_dir, transform=None):
self.image_dir = image_dir
self.mask_dir = mask_dir
self.transform = transform
self.image_files = sorted(os.listdir(image_dir)) # 獲取圖片文件列表並排序
def __getitem__(self, idx):
# 獲取圖像文件名
image_file = self.image_files[idx]
image_path = os.path.join(self.image_dir, image_file)
# 構建掩膜文件名,假設掩膜文件以 "_mask" 結尾
mask_file = image_file.replace(".jpg", "_mask.png")
mask_path = os.path.join(self.mask_dir, mask_file)
# 加載圖像和掩膜
image = Image.open(image_path).convert('RGB')
mask = Image.open(mask_path).convert('L') # 灰度圖
# 如果有 transform(數據增強等),應用 transform
if self.transform:
image = self.transform(image)
mask = self.transform(mask)
mask = torch.tensor(np.array(mask, dtype=np.int64))
return image, mask
def __len__(self):
# 返回數據集中圖像文件的數量
return len(self.image_files)
__init__
構造函數#
def __init__(self, image_dir, mask_dir, transform=None):
self.image_dir = image_dir
self.mask_dir = mask_dir
self.transform = transform
self.image_files = sorted(os.listdir(image_dir)) # 獲取圖片文件列表並排序
image_dir
:圖像存放的目錄路徑。mask_dir
:掩膜圖像存放的目錄路徑。每張圖像都會有一個對應的掩膜圖像,掩膜是標註了目標區域的圖像。transform
:如果有數據預處理或數據增強的操作,可以傳遞給transform
。例如,可能會進行圖像大小調整、歸一化等。image_files
:獲取image_dir
中的所有文件名並進行排序,確保圖像的順序與掩膜的順序一致。
__getitem__
方法#
def __getitem__(self, idx):
# 獲取圖像文件名
image_file = self.image_files[idx]
image_path = os.path.join(self.image_dir, image_file)
# 構建掩膜文件名,假設掩膜文件以 "_mask" 結尾
mask_file = image_file.replace(".jpg", "_mask.png")
mask_path = os.path.join(self.mask_dir, mask_file)
# 加載圖像和掩膜
image = Image.open(image_path).convert('RGB')
mask = Image.open(mask_path).convert('L') # 灰度圖
-
idx
:傳入的索引,表示要加載數據集中的哪一張圖片及其對應的掩膜。 -
image_file
:根據idx
獲取當前圖像文件的名稱。 -
image_path
:根據文件名構建圖像的完整路徑。 -
mask_file
:假設掩膜圖像與原圖像的文件名一致,只是在原文件名的基礎上加上_mask
後綴(假設原圖為.jpg
,掩膜圖像為.png
)。可以根據需要修改這個規則。 -
mask_path
:根據掩膜文件名構建掩膜圖像的完整路徑。 -
加載圖像和掩膜:
- 使用
Pillow
的Image.open()
函數加載圖像,並使用.convert('RGB')
確保圖像是三通道的 RGB 格式。 - 掩膜是灰度圖,所以加載時使用
.convert('L')
,使其成為單通道的灰度圖像。
- 使用
應用預處理操作#
if self.transform:
image = self.transform(image)
mask = self.transform(mask)
- 如果傳遞了
transform
(例如數據增強或預處理操作),則對圖像和掩膜應用該操作。通常,這裡會進行如調整大小、歸一化、數據增強等操作。
轉換掩膜為 PyTorch 張量#
mask = torch.tensor(np.array(mask, dtype=np.int64))
- 將掩膜圖像從
Pillow
圖像對象轉換為 NumPy 陣列。 - 然後將 NumPy 陣列轉換為 PyTorch 張量,類型為
int64
。這裡使用int64
是因為通常分割任務的標籤是整數類型(比如每個像素對應的類別 ID)。
__len__
方法#
def __len__(self):
# 返回數據集中圖像文件的數量
return len(self.image_files)
- 該方法返回數據集中圖像文件的數量。PyTorch 數據集類需要實現該方法,以便能夠知道數據集的大小。
這段代碼實現了一個名為 Nested U-Net 的深度學習模型,主要用於 圖像分割 任務。Nested U-Net 是在傳統 U-Net 基礎上改進的一種結構,通過增加嵌套跳躍連接(nested skip connections),進一步提升了模型的分割精度。下面我會詳細解釋代碼中的各個部分,特別是每個模塊的作用。
Nested U-Net#
VGGBlock#
class VGGBlock(nn.Module):
def __init__(self, in_channels, middle_channels, out_channels):
super().__init__()
self.relu = nn.ReLU(inplace=True)
self.conv1 = nn.Conv2d(in_channels, middle_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(middle_channels)
self.conv2 = nn.Conv2d(middle_channels, out_channels, 3, padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
return out
- VGGBlock 是模型中一個核心的卷積塊。每個塊包含:
- 卷積層:
conv1
和conv2
,都使用3x3
的卷積核,並且使用了 padding=1 保證輸出尺寸和輸入相同。 - 批歸一化層:
bn1
和bn2
,用於加速訓練並穩定模型。 - ReLU 激活函數:增加非線性表達能力。
- 卷積層:
這個塊被多次重複調用,構成 U-Net 和 Nested U-Net 的基礎卷積操作。
NestedUNet#
class NestedUNet(nn.Module):
def __init__(self, num_classes=2, input_channels=2, deep_supervision=False, **kwargs):
super().__init__()
nb_filter = [32, 64, 128, 256, 512]
self.deep_supervision = deep_supervision
self.pool = nn.MaxPool2d(2, 2)
self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
# 定義每一層的卷積模塊
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])
# 定義嵌套的卷積模塊(即跳躍連接)
self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])
self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_2 = VGGBlock(nb_filter[2]*2+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_3 = VGGBlock(nb_filter[1]*3+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_4 = VGGBlock(nb_filter[0]*4+nb_filter[1], nb_filter[0], nb_filter[0])
# 最終的輸出層,支持深度監督(Deep Supervision)
if self.deep_supervision:
self.final1 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final2 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final3 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final4 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
else:
self.final = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
主要模塊:#
-
卷積層:模型的每一層都是由
VGGBlock
組成。每一層的輸出通道數逐漸增加(32, 64, 128, 256, 512),然後在之後的嵌套層中通過跳躍連接進一步進行融合。 -
跳躍連接:這種設計是 Nested U-Net 的關鍵,每一層的輸出不僅用於下一層,還和其他層的輸出拼接(concatenate)。這種設計幫助保留了更多的細節信息,並改善了分割精度。
-
上採樣(Upsampling):通過
Upsample
將圖像尺寸增大,並進行跳躍連接後,再進行卷積操作。 -
深度監督(Deep Supervision):通過在多個階段產生輸出,增強模型的學習效果。這是 Nested U-Net 的一個特點,可以讓模型在不同的深度層次上進行監督,提高性能。
Forward 方法#
def forward(self, input):
# 各種卷積操作
x0_0 = self.conv0_0(input)
x1_0 = self.conv1_0(self.pool(x0_0))
x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))
x2_0 = self.conv2_0(self.pool(x1_0))
x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))
# 繼續進行嵌套連接和卷積操作,直到最後一層
x3_0 = self.conv3_0(self.pool(x2_0))
x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))
x4_0 = self.conv4_0(self.pool(x3_0))
x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
x2_2 = self.conv2_2(torch.cat([x2_0, x2_1, self.up(x3_1)], 1))
x1_3 = self.conv1_3(torch.cat([x1_0, x1_1, x1_2, self.up(x2_2)], 1))
x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.up(x1_3)], 1))
if self.deep_supervision:
output1 = self.final1(x0_1)
output2 = self.final2(x0_2)
output3 = self.final3(x0_3)
output4 = self.final4(x0_4)
return [output1, output2, output3, output4]
else:
output = self.final(x0_4)
return output
- 卷積和池化操作:通過
self.pool
進行下採樣(池化),通過self.up
進行上採樣(反卷積),並通過拼接(torch.cat
)將不同層的輸出合併起來。 - 深度監督輸出:如果啟用深度監督,會在多個中間層輸出結果;否則,只在最後輸出結果。
此文由 Mix Space 同步更新至 xLog
原始鏈接為 https://blog.kanes.top/posts/ArtificialIntelligence/AnalysisofImageSegmentationCode