UNetPlusPlus 画像分割コード分析#
訓練コードと説明#
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from my_dataset import ImageSegmentationDataset # カスタムデータセット
from NestedUNet import NestedUNet # モデル定義ファイル
# ハイパーパラメータの定義
batch_size = 1
learning_rate = 1e-4
num_epochs = 200
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# 新しいサイズを計算し、元のサイズを 2 で割る
new_height = 2048 // 2
new_width = 3072 // 2
# データ前処理とデータ拡張
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 画像サイズを元のサイズの半分に調整
transforms.ToTensor() # PyTorch テンソルに変換
])
# データをロード
train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
mask_dir='./dataset/train/masks',
transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# モデル、損失関数、オプティマイザの初期化
model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 訓練ループ
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
for images, masks in train_loader:
images, masks = images.to(device), masks.to(device)
# ターゲットテンソルの形状を [batch_size, height, width] にする
masks = torch.squeeze(masks, dim=1) # チャンネル次元を削除
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, masks)
loss.backward()
optimizer.step()
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
# 訓練したモデルを保存
torch.save(model.state_dict(), './model.pth')
1. データ前処理とロード#
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 画像サイズを元のサイズの半分に調整
transforms.ToTensor() # PyTorch テンソルに変換
])
- Resize: 画像とマスクを新しいサイズ
(new_height, new_width)
に調整します。ここでは元のサイズ(2048, 3072)
を縮小しています。 - ToTensor: 画像とマスクを PyTorch テンソルに変換し、ピクセル値を [0, 1] の範囲に正規化します。
train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
mask_dir='./dataset/train/masks',
transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
- ImageSegmentationDataset: カスタムデータセットクラスで、画像と対応するマスクをロードします。
- DataLoader: データセットをイテラブルな DataLoader にラップし、バッチサイズとシャッフルを設定します。
2. モデル、損失関数、オプティマイザの初期化#
model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
- NestedUNet: 画像分割用のカスタムニューラルネットワークモデルで、入力チャンネル数は 3(RGB 画像)、出力クラス数は 2 です。
- CrossEntropyLoss: 多クラス分類タスクに適した損失関数で、画像分割によく使用されます。
- Adam オプティマイザ: ネットワークパラメータを更新するために使用されます。
3. 訓練ループ#
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
for images, masks in train_loader:
images, masks = images.to(device), masks.to(device)
# ターゲットテンソルの形状を [batch_size, height, width] にする
masks = torch.squeeze(masks, dim=1) # チャンネル次元を削除
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, masks)
loss.backward()
optimizer.step()
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
- model.train(): モデルを訓練モードに設定し、ドロップアウトとバッチ正規化を有効にします。
- images, masks = images.to(device), masks.to(device): データを GPU または CPU に転送します。
- masks = torch.squeeze(masks, dim=1): これは重要なステップで、以下に説明します。
4. チャンネル処理に関する詳細な説明#
画像分割タスクにおいて:
- 入力画像は通常三次元で、形状は
[batch_size, channels, height, width]
です。例えば[1, 3, 1024, 1536]
です。 - マスクは通常四次元ですが、チャンネル数は 1 で、形状は
[batch_size, 1, height, width]
です。例えば[1, 1, 1024, 1536]
です。
しかし、nn.CrossEntropyLoss
関数はターゲットマスクの形状を [batch_size, height, width]
にすることを要求します。つまり、チャンネル次元を含まない必要があります。
したがって、torch.squeeze
関数を使用してマスクのチャンネル次元を削除する必要があります:
masks = torch.squeeze(masks, dim=1)
これにより、マスクの形状は [batch_size, 1, height, width]
から [batch_size, height, width]
に変わり、損失関数の要件を満たします。
5. モデル出力と損失計算#
- outputs = model(images): モデル出力の形状は
[batch_size, num_classes, height, width]
です。例えば[1, 2, 1024, 1536]
です。 - loss = criterion(outputs, masks): 予測結果と真のマスクとの間の交差エントロピー損失を計算します。
6. モデル保存#
torch.save(model.state_dict(), './model.pth')
- モデルのパラメータをファイル
model.pth
に保存し、後でロードして推論に使用できるようにします。
このコードの主な機能は、事前訓練された NestedUNet モデルをロードし、指定されたディレクトリ内の画像に対して分割を行い、結果を出力ディレクトリに保存することです。コードの実行フローは以下の通りです:
推論コードと説明#
import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms
from NestedUNet import NestedUNet # モデル定義ファイル
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# モデルをロード
def load_model(model, path):
if not os.path.exists(path):
raise FileNotFoundError(f"Model file not found: {path}")
model.load_state_dict(torch.load(path, map_location=device))
model.eval()
return model
# 推論を行う
def segment_images(model, image_dir, output_dir):
# 新しいサイズを計算し、元のサイズを 2 で割る
new_height = 2048 // 2
new_width = 3072 // 2
# データ前処理とデータ拡張
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 画像サイズを元のサイズの半分に調整
transforms.ToTensor() # PyTorch テンソルに変換
])
os.makedirs(output_dir, exist_ok=True)
for filename in os.listdir(image_dir):
if filename.endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(image_dir, filename)
image = Image.open(filepath).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device) # バッチ次元を追加
with torch.no_grad():
outputs = model(input_tensor)
prediction = torch.argmax(outputs, dim=1).squeeze(0) # 分割結果を取得
# 分割結果を保存
output_filename = filename.split('.')[0] + '_segmentation.png'
output_path = os.path.join(output_dir, output_filename)
# クラス値を 0-255 の範囲にマッピング
pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
Image.fromarray(pred_img).save(output_path)
# 主実行コード
if __name__ == "__main__":
model = NestedUNet(num_classes=2, input_channels=3).to(device)
model = load_model(model, './model.pth') # 事前訓練モデルをロード
# 入力ディレクトリと出力ディレクトリを定義
input_dirs = [
'./dataset/1-2000',
'./dataset/2001-4000',
'./dataset/4001-6000',
'./dataset/6001-8000',
'./dataset/8001-9663'
]
base_output_dir = './dataset/segmentation_results' # 基本出力結果ディレクトリ
for input_dir in input_dirs:
output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
segment_images(model, input_dir, output_dir)
print(f"Segmentation results saved to: {base_output_dir}")
1. デバイス選択#
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
- マシンに利用可能な GPU があるかどうかを
torch.cuda.is_available()
で確認し、計算デバイスを選択します。GPU が利用可能な場合、コードは GPU を選択し、そうでなければ CPU を使用します。
2. モデルのロード#
def load_model(model, path):
if not os.path.exists(path):
raise FileNotFoundError(f"Model file not found: {path}")
model.load_state_dict(torch.load(path, map_location=device))
model.eval()
return model
load_model
関数:- パスにモデルファイルが存在するか確認します。
torch.load()
を使用して事前訓練されたモデルパラメータをロードします。- ロード後、
model.eval()
を呼び出してモデルを評価モードに設定します(ドロップアウトなどの操作を無効にします)。 - この関数は、重みをロードしたモデルを返します。
3. 画像分割推論を行う#
def segment_images(model, image_dir, output_dir):
new_height = 2048 // 2
new_width = 3072 // 2
- 目標画像のサイズを設定します。元の画像の高さと幅をそれぞれ半分に縮小します(
2048 // 2
と3072 // 2
)。
データ前処理#
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 画像サイズを調整
transforms.ToTensor() # PyTorch テンソルに変換
])
- 画像は
Resize
変換を通じて新しいサイズに調整されます。 - 次に、
ToTensor()
を使用して PyTorch テンソル形式に変換され、モデルに入力できるようになります。
各画像の処理#
for filename in os.listdir(image_dir):
if filename.endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(image_dir, filename)
image = Image.open(filepath).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device) # バッチ次元を追加
image_dir
ディレクトリ内のすべての画像ファイル(.png
、.jpg
、.jpeg
フォーマットをサポート)をループします。- 各画像を読み込み、RGB モードに変換します(グレースケール画像も RGB 画像として処理されます)。
- 事前処理
transform
を使用してテンソルに変換し、バッチ次元を追加します(unsqueeze(0)
)し、形状を[1, C, H, W]
にします(モデル入力に適合)。
モデル推論#
with torch.no_grad():
outputs = model(input_tensor)
prediction = torch.argmax(outputs, dim=1).squeeze(0) # 分割結果を取得
torch.no_grad()
を使用して勾配計算を無効にし、メモリを節約し、推論を高速化します。model(input_tensor)
はモデルの出力を返し(各ピクセルのクラス確率)、torch.argmax(outputs, dim=1)
は各ピクセルの最大確率のクラスを予測クラスとして取得します。squeeze(0)
はバッチ次元を削除し、得られたprediction
の形状は[H, W]
になります。
分割結果の保存#
output_filename = filename.split('.')[0] + '_segmentation.png'
output_path = os.path.join(output_dir, output_filename)
pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
Image.fromarray(pred_img).save(output_path)
output_filename
: 各出力画像ファイルの名前を設定し、形式は元のファイル名に_segmentation.png
を追加します。prediction.cpu().numpy()
: 予測結果を GPU から CPU に移動し、NumPy 配列に変換します。astype(np.uint8) * 255
: 予測クラス(0 または 1)をグレースケール値(0 または 255)にマッピングし、結果を白黒画像として保存できるようにします。- Pillow を使用して
pred_img
を PNG 形式で保存します。
4. 主実行コード#
if __name__ == "__main__":
model = NestedUNet(num_classes=2, input_channels=3).to(device)
model = load_model(model, './model.pth') # 事前訓練モデルをロード
input_dirs = [
'./dataset/1-2000',
'./dataset/2001-4000',
'./dataset/4001-6000',
'./dataset/6001-8000',
'./dataset/8001-9663'
]
base_output_dir = './dataset/segmentation_results' # 基本出力結果ディレクトリ
for input_dir in input_dirs:
output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
segment_images(model, input_dir, output_dir)
print(f"Segmentation results saved to: {base_output_dir}")
- 主プログラムでは、まず
NestedUNet
モデルをロードし、重みをロードします。 - 複数のサブフォルダパス(
input_dirs
)を含むリストを定義し、各フォルダには分割対象の画像が含まれています。 - 各入力フォルダに対して対応する出力フォルダを生成し、分割結果をこれらの出力フォルダに保存します。
- 最後に、保存結果のディレクトリパスを出力します。
データ前処理コードと説明#
import os
import numpy as np
import torch
from PIL import Image
class ImageSegmentationDataset:
def __init__(self, image_dir, mask_dir, transform=None):
self.image_dir = image_dir
self.mask_dir = mask_dir
self.transform = transform
self.image_files = sorted(os.listdir(image_dir)) # 画像ファイルリストを取得し、ソート
def __getitem__(self, idx):
# 画像ファイル名を取得
image_file = self.image_files[idx]
image_path = os.path.join(self.image_dir, image_file)
# マスクファイル名を構築、マスクファイルは "_mask" で終わると仮定
mask_file = image_file.replace(".jpg", "_mask.png")
mask_path = os.path.join(self.mask_dir, mask_file)
# 画像とマスクをロード
image = Image.open(image_path).convert('RGB')
mask = Image.open(mask_path).convert('L') # グレースケール画像
# transform(データ拡張など)があれば、transformを適用
if self.transform:
image = self.transform(image)
mask = self.transform(mask)
mask = torch.tensor(np.array(mask, dtype=np.int64))
return image, mask
def __len__(self):
# データセット内の画像ファイルの数を返す
return len(self.image_files)
__init__
コンストラクタ#
def __init__(self, image_dir, mask_dir, transform=None):
self.image_dir = image_dir
self.mask_dir = mask_dir
self.transform = transform
self.image_files = sorted(os.listdir(image_dir)) # 画像ファイルリストを取得し、ソート
image_dir
: 画像が保存されているディレクトリのパス。mask_dir
: マスク画像が保存されているディレクトリのパス。各画像には対応するマスク画像があります。マスクはターゲット領域を注釈した画像です。transform
: データ前処理やデータ拡張の操作があれば、transform
に渡すことができます。例えば、画像サイズの調整、正規化などが行われることがあります。image_files
:image_dir
内のすべてのファイル名を取得し、ソートして、画像の順序がマスクの順序と一致するようにします。
__getitem__
メソッド#
def __getitem__(self, idx):
# 画像ファイル名を取得
image_file = self.image_files[idx]
image_path = os.path.join(self.image_dir, image_file)
# マスクファイル名を構築、マスクファイルは "_mask" で終わると仮定
mask_file = image_file.replace(".jpg", "_mask.png")
mask_path = os.path.join(self.mask_dir, mask_file)
# 画像とマスクをロード
image = Image.open(image_path).convert('RGB')
mask = Image.open(mask_path).convert('L') # グレースケール画像
-
idx
: 渡されたインデックスで、データセット内のどの画像とその対応するマスクをロードするかを示します。 -
image_file
:idx
に基づいて現在の画像ファイルの名前を取得します。 -
image_path
: ファイル名に基づいて画像の完全なパスを構築します。 -
mask_file
: マスク画像は元の画像のファイル名と一致し、単に_mask
サフィックスを追加すると仮定します(元の画像が.jpg
の場合、マスク画像は.png
になります)。このルールは必要に応じて変更できます。 -
mask_path
: マスクファイル名に基づいてマスク画像の完全なパスを構築します。 -
画像とマスクをロード:
Pillow
のImage.open()
関数を使用して画像をロードし、.convert('RGB')
を使用して画像が三チャンネルの RGB 形式であることを確認します。- マスクはグレースケール画像であるため、
.convert('L')
を使用して単チャンネルのグレースケール画像にします。
前処理操作の適用#
if self.transform:
image = self.transform(image)
mask = self.transform(mask)
transform
が渡された場合(データ拡張や前処理操作など)、画像とマスクにその操作を適用します。通常、ここではサイズ調整、正規化、データ拡張などが行われます。
マスクを PyTorch テンソルに変換#
mask = torch.tensor(np.array(mask, dtype=np.int64))
- マスク画像を
Pillow
画像オブジェクトから NumPy 配列に変換します。 - 次に、NumPy 配列を PyTorch テンソルに変換し、型を
int64
にします。ここでint64
を使用するのは、通常分割タスクのラベルが整数型(各ピクセルに対応するクラス ID)であるためです。
__len__
メソッド#
def __len__(self):
# データセット内の画像ファイルの数を返す
return len(self.image_files)
- このメソッドはデータセット内の画像ファイルの数を返します。PyTorch データセットクラスはこのメソッドを実装する必要があり、データセットのサイズを知ることができます。
このコードは Nested U-Net という深層学習モデルを実装しており、主に 画像分割 タスクに使用されます。Nested U-Net は従来の U-Net を基に改良された構造で、ネストされたスキップ接続(nested skip connections)を追加することで、モデルの分割精度をさらに向上させます。以下にコードの各部分を詳細に説明し、特に各モジュールの役割に焦点を当てます。
Nested U-Net#
VGGBlock#
class VGGBlock(nn.Module):
def __init__(self, in_channels, middle_channels, out_channels):
super().__init__()
self.relu = nn.ReLU(inplace=True)
self.conv1 = nn.Conv2d(in_channels, middle_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(middle_channels)
self.conv2 = nn.Conv2d(middle_channels, out_channels, 3, padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
return out
- VGGBlock はモデルの中核となる畳み込みブロックです。各ブロックは以下を含みます:
- 畳み込み層:
conv1
とconv2
は、いずれも3x3
の畳み込みカーネルを使用し、padding=1 を使用して出力サイズが入力と同じになるようにします。 - バッチ正規化層:
bn1
とbn2
は、訓練を加速し、モデルを安定させるために使用されます。 - ReLU 活性化関数:非線形表現能力を高めます。
- 畳み込み層:
このブロックは何度も呼び出され、U-Net と Nested U-Net の基本的な畳み込み操作を構成します。
NestedUNet#
class NestedUNet(nn.Module):
def __init__(self, num_classes=2, input_channels=2, deep_supervision=False, **kwargs):
super().__init__()
nb_filter = [32, 64, 128, 256, 512]
self.deep_supervision = deep_supervision
self.pool = nn.MaxPool2d(2, 2)
self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
# 各層の畳み込みモジュールを定義
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])
# ネストされた畳み込みモジュール(すなわちスキップ接続)を定義
self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])
self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_2 = VGGBlock(nb_filter[2]*2+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_3 = VGGBlock(nb_filter[1]*3+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_4 = VGGBlock(nb_filter[0]*4+nb_filter[1], nb_filter[0], nb_filter[0])
# 最終出力層、深度監視(Deep Supervision)をサポート
if self.deep_supervision:
self.final1 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final2 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final3 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final4 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
else:
self.final = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
主なモジュール:#
-
畳み込み層:モデルの各層は
VGGBlock
で構成されています。各層の出力チャンネル数は徐々に増加し(32, 64, 128, 256, 512)、その後のネストされた層でスキップ接続を通じてさらに融合されます。 -
スキップ接続:この設計は Nested U-Net の鍵であり、各層の出力は次の層にだけでなく、他の層の出力とも結合(concatenate)されます。この設計は、より多くの詳細情報を保持し、分割精度を改善するのに役立ちます。
-
アップサンプリング(Upsampling):
Upsample
を使用して画像サイズを大きくし、スキップ接続の後に畳み込み操作を行います。 -
深度監視(Deep Supervision):複数の段階で出力を生成することで、モデルの学習効果を強化します。これは Nested U-Net の特徴であり、モデルが異なる深さのレベルで監視を行い、性能を向上させます。
Forward メソッド#
def forward(self, input):
# 様々な畳み込み操作
x0_0 = self.conv0_0(input)
x1_0 = self.conv1_0(self.pool(x0_0))
x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))
x2_0 = self.conv2_0(self.pool(x1_0))
x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))
# ネストされた接続と畳み込み操作を続け、最後の層まで進む
x3_0 = self.conv3_0(self.pool(x2_0))
x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))
x4_0 = self.conv4_0(self.pool(x3_0))
x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
x2_2 = self.conv2_2(torch.cat([x2_0, x2_1, self.up(x3_1)], 1))
x1_3 = self.conv1_3(torch.cat([x1_0, x1_1, x1_2, self.up(x2_2)], 1))
x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.up(x1_3)], 1))
if self.deep_supervision:
output1 = self.final1(x0_1)
output2 = self.final2(x0_2)
output3 = self.final3(x0_3)
output4 = self.final4(x0_4)
return [output1, output2, output3, output4]
else:
output = self.final(x0_4)
return output
- 畳み込みとプーリング操作:
self.pool
を使用してダウンサンプリング(プーリング)を行い、self.up
を使用してアップサンプリング(逆畳み込み)を行い、異なる層の出力を結合します。 - 深度監視出力:深度監視が有効な場合、複数の中間層で結果を出力します。そうでない場合は、最後の結果のみを出力します。
この文は Mix Space によって同期更新され、xLog に掲載されています。
元のリンクは https://blog.kanes.top/posts/ArtificialIntelligence/AnalysisofImageSegmentationCode