banner
kanes

kanes

圖像分割代碼分析

UNetPlusPlus 圖像分割代碼分析#

訓練代碼與解釋#

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from my_dataset import ImageSegmentationDataset  # 自定義數據集
from NestedUNet import NestedUNet  # 模型定義文件

# 定義超參數
batch_size = 1
learning_rate = 1e-4
num_epochs = 200

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

# 計算新尺寸,將原始尺寸除以 2
new_height = 2048 // 2
new_width = 3072 // 2

# 數據預處理和數據增強
transform = transforms.Compose([
    transforms.Resize((new_height, new_width)),  # 將圖像大小調整為原始尺寸的一半
    transforms.ToTensor()  # 轉換為 PyTorch 張量
])

# 加載數據
train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
                                         mask_dir='./dataset/train/masks',
                                         transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# 初始化模型、損失函數、優化器
model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 訓練循環
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)

        # 確保目標張量的形狀為 [batch_size, height, width]
        masks = torch.squeeze(masks, dim=1)  # 去除通道維度

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

# 保存訓練好的模型
torch.save(model.state_dict(), './model.pth')

1. 數據預處理和加載#

transform = transforms.Compose([
    transforms.Resize((new_height, new_width)),  # 將圖像大小調整為原始尺寸的一半
    transforms.ToTensor()  # 轉換為 PyTorch 張量
])
  • Resize: 將圖像和掩碼調整為新的尺寸 (new_height, new_width),這裡是對原始尺寸 (2048, 3072) 進行縮小。
  • ToTensor: 將圖像和掩碼轉換為 PyTorch 張量,並將像素值歸一化到 [0, 1] 範圍。
train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
                                         mask_dir='./dataset/train/masks',
                                         transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
  • ImageSegmentationDataset: 自定義的數據集類,負責加載圖像和對應的掩碼。
  • DataLoader: 將數據集包裝成可迭代的 DataLoader,設置 batch size 和 shuffle。

2. 模型、損失函數和優化器的初始化#

model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
  • NestedUNet: 自定義的神經網絡模型,用於圖像分割,輸入通道數為 3(RGB 圖像),輸出類別數為 2。
  • CrossEntropyLoss: 適用於多類分類任務的損失函數,常用於圖像分割。
  • Adam 優化器: 用於更新網絡參數。

3. 訓練循環#

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)
        
        # 確保目標張量的形狀為 [batch_size, height, width]
        masks = torch.squeeze(masks, dim=1)  # 去除通道維度
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
  • model.train(): 將模型設置為訓練模式,啟用 dropout 和 batch normalization。
  • images, masks = images.to(device), masks.to(device): 將數據轉移到 GPU 或 CPU。
  • masks = torch.squeeze(masks, dim=1): 這是關鍵一步,解釋如下。

4. 關於通道處理的詳細解釋#

在圖像分割任務中:

  • 輸入圖像通常是三維的,形狀為 [batch_size, channels, height, width],例如 [1, 3, 1024, 1536]
  • ** 掩碼(mask)** 通常是四維的,但通道數為 1,形狀為 [batch_size, 1, height, width],例如 [1, 1, 1024, 1536]

然而,nn.CrossEntropyLoss 函數要求目標掩碼的形狀為 [batch_size, height, width],即不包含通道維度。

因此,需要使用 torch.squeeze 函數去除掩碼的通道維度:

masks = torch.squeeze(masks, dim=1)

這將掩碼的形狀從 [batch_size, 1, height, width] 變為 [batch_size, height, width],滿足損失函數的要求。

5. 模型輸出與損失計算#

  • outputs = model(images): 模型輸出形狀為 [batch_size, num_classes, height, width],例如 [1, 2, 1024, 1536]
  • loss = criterion(outputs, masks): 計算預測結果與真實掩碼之間的交叉熵損失。

6. 模型保存#

torch.save(model.state_dict(), './model.pth')
  • 保存模型的參數到文件 model.pth,方便後續加載和推理。

這段代碼的主要功能是加載一個預訓練的 NestedUNet 模型,使用它對指定目錄下的圖像進行分割,並將結果保存到輸出目錄。代碼的執行流程如下:


推理代碼與解釋#

import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms
from NestedUNet import NestedUNet  # 模型定義文件

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 加載模型
def load_model(model, path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Model file not found: {path}")
    model.load_state_dict(torch.load(path, map_location=device))
    model.eval()
    return model

# 進行推理
def segment_images(model, image_dir, output_dir):
    # 計算新尺寸,將原始尺寸除以 2
    new_height = 2048 // 2
    new_width = 3072 // 2

    # 數據預處理和數據增強
    transform = transforms.Compose([
        transforms.Resize((new_height, new_width)),  # 將圖像大小調整為原始尺寸的一半
        transforms.ToTensor()  # 轉換為 PyTorch 張量
    ])

    os.makedirs(output_dir, exist_ok=True)

    for filename in os.listdir(image_dir):
        if filename.endswith(('.png', '.jpg', '.jpeg')):
            filepath = os.path.join(image_dir, filename)
            image = Image.open(filepath).convert('RGB')
            input_tensor = transform(image).unsqueeze(0).to(device)  # 增加批量維度

            with torch.no_grad():
                outputs = model(input_tensor)
                prediction = torch.argmax(outputs, dim=1).squeeze(0)  # 獲取分割結果

            # 保存分割結果
            output_filename = filename.split('.')[0] + '_segmentation.png'
            output_path = os.path.join(output_dir, output_filename)

            # 將類別值映射到 0-255 範圍
            pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
            Image.fromarray(pred_img).save(output_path)

# 主執行代碼
if __name__ == "__main__":
    model = NestedUNet(num_classes=2, input_channels=3).to(device)
    model = load_model(model, './model.pth')  # 加載預訓練模型

    # 定義輸入目錄和輸出目錄
    input_dirs = [
        './dataset/1-2000',
        './dataset/2001-4000',
        './dataset/4001-6000',
        './dataset/6001-8000',
        './dataset/8001-9663'
    ]

    base_output_dir = './dataset/segmentation_results'  # 基礎輸出結果目錄

    for input_dir in input_dirs:
        output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
        segment_images(model, input_dir, output_dir)

    print(f"Segmentation results saved to: {base_output_dir}")

1. 設備選擇#

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
  • 根據機器是否有可用的 GPU(通過 torch.cuda.is_available() 檢查)來選擇計算設備。如果有 GPU 可用,代碼會選擇 GPU,否則使用 CPU。

2. 加載模型#

def load_model(model, path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Model file not found: {path}")
    model.load_state_dict(torch.load(path, map_location=device))
    model.eval()
    return model
  • load_model 函數:
    • 檢查路徑中是否有模型文件。
    • 使用 torch.load() 加載預訓練的模型參數。
    • 加載後,調用 model.eval() 將模型設置為評估模式(禁用 dropout 等操作)。
    • 該函數返回加載完權重的模型。

3. 進行圖像分割推理#

def segment_images(model, image_dir, output_dir):
    new_height = 2048 // 2
    new_width = 3072 // 2
  • 設置目標圖像的大小,即將原始圖像的高和寬各縮小一半(2048 // 23072 // 2)。

數據預處理#

transform = transforms.Compose([
    transforms.Resize((new_height, new_width)),  # 調整圖像大小
    transforms.ToTensor()  # 轉換為 PyTorch 張量
])
  • 圖像通過 Resize 變換調整為新尺寸。
  • 然後通過 ToTensor() 轉換為 PyTorch 張量格式,方便輸入到模型。

處理每張圖像#

for filename in os.listdir(image_dir):
    if filename.endswith(('.png', '.jpg', '.jpeg')):
        filepath = os.path.join(image_dir, filename)
        image = Image.open(filepath).convert('RGB')
        input_tensor = transform(image).unsqueeze(0).to(device)  # 增加批量維度
  • 遍歷 image_dir 目錄中的所有圖像文件(支持 .png.jpg.jpeg 格式)。
  • 對每張圖像進行讀取並轉換為 RGB 模式(即使是灰度圖也會被處理為 RGB 圖像)。
  • 使用預處理 transform 轉換為張量,並添加一個批量維度(unsqueeze(0)),使形狀變為 [1, C, H, W](適配模型輸入)。

模型推理#

with torch.no_grad():
    outputs = model(input_tensor)
    prediction = torch.argmax(outputs, dim=1).squeeze(0)  # 獲取分割結果
  • 使用 torch.no_grad() 禁用梯度計算,節省內存和加速推理。
  • model(input_tensor) 會返回模型的輸出(每個像素的類別概率)。
  • torch.argmax(outputs, dim=1):對每個像素,取類別概率最大的一項作為預測類別。
  • squeeze(0):去除批量維度,得到的 prediction 形狀為 [H, W]

保存分割結果#

output_filename = filename.split('.')[0] + '_segmentation.png'
output_path = os.path.join(output_dir, output_filename)

pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
Image.fromarray(pred_img).save(output_path)
  • output_filename:為每個輸出圖像文件命名,格式為原始文件名加 _segmentation.png
  • prediction.cpu().numpy():將預測結果從 GPU 移到 CPU,並轉換為 NumPy 陣列。
  • astype(np.uint8) * 255:將預測類別(0 或 1)映射到灰度值(0 或 255),使得結果可以保存為黑白圖片。
  • 使用 Pillowpred_img 保存為 PNG 格式。

4. 主執行代碼#

if __name__ == "__main__":
    model = NestedUNet(num_classes=2, input_channels=3).to(device)
    model = load_model(model, './model.pth')  # 加載預訓練模型

    input_dirs = [
        './dataset/1-2000',
        './dataset/2001-4000',
        './dataset/4001-6000',
        './dataset/6001-8000',
        './dataset/8001-9663'
    ]

    base_output_dir = './dataset/segmentation_results'  # 基礎輸出結果目錄

    for input_dir in input_dirs:
        output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
        segment_images(model, input_dir, output_dir)

    print(f"Segmentation results saved to: {base_output_dir}")
  • 在主程序中,首先加載 NestedUNet 模型並加載權重。
  • 定義一個包含多個子文件夾路徑(input_dirs)的列表,每個文件夾包含待分割的圖像。
  • 為每個輸入文件夾生成對應的輸出文件夾,將分割結果保存到這些輸出文件夾。
  • 最後輸出保存結果的目錄路徑。

數據預處理代碼與解釋#

import os

import numpy as np
import torch
from PIL import Image


class ImageSegmentationDataset:
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(image_dir))  # 獲取圖片文件列表並排序

    def __getitem__(self, idx):
        # 獲取圖像文件名
        image_file = self.image_files[idx]
        image_path = os.path.join(self.image_dir, image_file)

        # 構建掩膜文件名,假設掩膜文件以 "_mask" 結尾
        mask_file = image_file.replace(".jpg", "_mask.png")
        mask_path = os.path.join(self.mask_dir, mask_file)

        # 加載圖像和掩膜
        image = Image.open(image_path).convert('RGB')
        mask = Image.open(mask_path).convert('L')  # 灰度圖

        # 如果有 transform(數據增強等),應用 transform
        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        mask = torch.tensor(np.array(mask, dtype=np.int64))
        return image, mask

    def __len__(self):
        # 返回數據集中圖像文件的數量
        return len(self.image_files)

__init__ 構造函數#

def __init__(self, image_dir, mask_dir, transform=None):
    self.image_dir = image_dir
    self.mask_dir = mask_dir
    self.transform = transform
    self.image_files = sorted(os.listdir(image_dir))  # 獲取圖片文件列表並排序
  • image_dir:圖像存放的目錄路徑。
  • mask_dir:掩膜圖像存放的目錄路徑。每張圖像都會有一個對應的掩膜圖像,掩膜是標註了目標區域的圖像。
  • transform:如果有數據預處理或數據增強的操作,可以傳遞給 transform。例如,可能會進行圖像大小調整、歸一化等。
  • image_files:獲取 image_dir 中的所有文件名並進行排序,確保圖像的順序與掩膜的順序一致。

__getitem__ 方法#

def __getitem__(self, idx):
    # 獲取圖像文件名
    image_file = self.image_files[idx]
    image_path = os.path.join(self.image_dir, image_file)

    # 構建掩膜文件名,假設掩膜文件以 "_mask" 結尾
    mask_file = image_file.replace(".jpg", "_mask.png")
    mask_path = os.path.join(self.mask_dir, mask_file)

    # 加載圖像和掩膜
    image = Image.open(image_path).convert('RGB')
    mask = Image.open(mask_path).convert('L')  # 灰度圖
  • idx:傳入的索引,表示要加載數據集中的哪一張圖片及其對應的掩膜。

  • image_file:根據 idx 獲取當前圖像文件的名稱。

  • image_path:根據文件名構建圖像的完整路徑。

  • mask_file:假設掩膜圖像與原圖像的文件名一致,只是在原文件名的基礎上加上 _mask 後綴(假設原圖為 .jpg,掩膜圖像為 .png)。可以根據需要修改這個規則。

  • mask_path:根據掩膜文件名構建掩膜圖像的完整路徑。

  • 加載圖像和掩膜

    • 使用 PillowImage.open() 函數加載圖像,並使用 .convert('RGB') 確保圖像是三通道的 RGB 格式。
    • 掩膜是灰度圖,所以加載時使用 .convert('L'),使其成為單通道的灰度圖像。

應用預處理操作#

if self.transform:
    image = self.transform(image)
    mask = self.transform(mask)
  • 如果傳遞了 transform(例如數據增強或預處理操作),則對圖像和掩膜應用該操作。通常,這裡會進行如調整大小、歸一化、數據增強等操作。

轉換掩膜為 PyTorch 張量#

mask = torch.tensor(np.array(mask, dtype=np.int64))
  • 將掩膜圖像從 Pillow 圖像對象轉換為 NumPy 陣列。
  • 然後將 NumPy 陣列轉換為 PyTorch 張量,類型為 int64。這裡使用 int64 是因為通常分割任務的標籤是整數類型(比如每個像素對應的類別 ID)。

__len__ 方法#

def __len__(self):
    # 返回數據集中圖像文件的數量
    return len(self.image_files)
  • 該方法返回數據集中圖像文件的數量。PyTorch 數據集類需要實現該方法,以便能夠知道數據集的大小。

這段代碼實現了一個名為 Nested U-Net 的深度學習模型,主要用於 圖像分割 任務。Nested U-Net 是在傳統 U-Net 基礎上改進的一種結構,通過增加嵌套跳躍連接(nested skip connections),進一步提升了模型的分割精度。下面我會詳細解釋代碼中的各個部分,特別是每個模塊的作用。


Nested U-Net#

VGGBlock#
class VGGBlock(nn.Module):
    def __init__(self, in_channels, middle_channels, out_channels):
        super().__init__()
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_channels, middle_channels, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(middle_channels)
        self.conv2 = nn.Conv2d(middle_channels, out_channels, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        return out
  • VGGBlock 是模型中一個核心的卷積塊。每個塊包含:
    • 卷積層conv1conv2,都使用 3x3 的卷積核,並且使用了 padding=1 保證輸出尺寸和輸入相同。
    • 批歸一化層bn1bn2,用於加速訓練並穩定模型。
    • ReLU 激活函數:增加非線性表達能力。

這個塊被多次重複調用,構成 U-Net 和 Nested U-Net 的基礎卷積操作。

NestedUNet#

class NestedUNet(nn.Module):
    def __init__(self, num_classes=2, input_channels=2, deep_supervision=False, **kwargs):
        super().__init__()

        nb_filter = [32, 64, 128, 256, 512]

        self.deep_supervision = deep_supervision
        self.pool = nn.MaxPool2d(2, 2)
        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        # 定義每一層的卷積模塊
        self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
        self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
        self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
        self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
        self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])

        # 定義嵌套的卷積模塊(即跳躍連接)
        self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
        self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
        self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
        self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])

        self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
        self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
        self.conv2_2 = VGGBlock(nb_filter[2]*2+nb_filter[3], nb_filter[2], nb_filter[2])

        self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
        self.conv1_3 = VGGBlock(nb_filter[1]*3+nb_filter[2], nb_filter[1], nb_filter[1])

        self.conv0_4 = VGGBlock(nb_filter[0]*4+nb_filter[1], nb_filter[0], nb_filter[0])

        # 最終的輸出層,支持深度監督(Deep Supervision)
        if self.deep_supervision:
            self.final1 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
            self.final2 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
            self.final3 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
            self.final4 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
        else:
            self.final = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
主要模塊:#
  • 卷積層:模型的每一層都是由 VGGBlock 組成。每一層的輸出通道數逐漸增加(32, 64, 128, 256, 512),然後在之後的嵌套層中通過跳躍連接進一步進行融合。

  • 跳躍連接:這種設計是 Nested U-Net 的關鍵,每一層的輸出不僅用於下一層,還和其他層的輸出拼接(concatenate)。這種設計幫助保留了更多的細節信息,並改善了分割精度。

  • 上採樣(Upsampling):通過 Upsample 將圖像尺寸增大,並進行跳躍連接後,再進行卷積操作。

  • 深度監督(Deep Supervision):通過在多個階段產生輸出,增強模型的學習效果。這是 Nested U-Net 的一個特點,可以讓模型在不同的深度層次上進行監督,提高性能。


Forward 方法#

def forward(self, input):
    # 各種卷積操作
    x0_0 = self.conv0_0(input)

    x1_0 = self.conv1_0(self.pool(x0_0))
    x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))

    x2_0 = self.conv2_0(self.pool(x1_0))
    x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
    x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))

    # 繼續進行嵌套連接和卷積操作,直到最後一層
    x3_0 = self.conv3_0(self.pool(x2_0))
    x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
    x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
    x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))

    x4_0 = self.conv4_0(self.pool(x3_0))
    x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
    x2_2 = self.conv2_2(torch.cat([x2_0, x2_1, self.up(x3_1)], 1))
    x1_3 = self.conv1_3(torch.cat([x1_0, x1_1, x1_2, self.up(x2_2)], 1))
    x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.up(x1_3)], 1))

    if self.deep_supervision:
        output1 = self.final1(x0_1)
        output2 = self.final2(x0_2)
        output3 = self.final3(x0_3)
        output4 = self.final4(x0_4)
        return [output1, output2, output3, output4]
    else:
        output = self.final(x0_4)
        return output
  • 卷積和池化操作:通過 self.pool 進行下採樣(池化),通過 self.up 進行上採樣(反卷積),並通過拼接(torch.cat)將不同層的輸出合併起來。
  • 深度監督輸出:如果啟用深度監督,會在多個中間層輸出結果;否則,只在最後輸出結果。

此文由 Mix Space 同步更新至 xLog
原始鏈接為 https://blog.kanes.top/posts/ArtificialIntelligence/AnalysisofImageSegmentationCode


載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。