๐Ÿ˜Ž ๊ณต๋ถ€ํ•˜๋Š” ์ง•์ง•์•ŒํŒŒ์นด๋Š” ์ฒ˜์Œ์ด์ง€?

[Kaggle] Chest X-Ray ํ์•” ์ด๋ฏธ์ง€ ๋ถ„๋ฅ˜ํ•˜๊ธฐ ๋ณธ๋ฌธ

๐Ÿ‘ฉ‍๐Ÿ’ป ์ปดํ“จํ„ฐ ๊ตฌ์กฐ/Kaggle

[Kaggle] Chest X-Ray ํ์•” ์ด๋ฏธ์ง€ ๋ถ„๋ฅ˜ํ•˜๊ธฐ

์ง•์ง•์•ŒํŒŒ์นด 2022. 1. 29. 18:56
728x90
๋ฐ˜์‘ํ˜•

220129 ์ž‘์„ฑ

<๋ณธ ๋ธ”๋กœ๊ทธ๋Š” Kaggle ์„ ์ฐธ๊ณ ํ•ด์„œ ๊ณต๋ถ€ํ•˜๋ฉฐ ์ž‘์„ฑํ•˜์˜€์Šต๋‹ˆ๋‹ค>

 

Search | Kaggle

 

www.kaggle.com

 

ํ์•” ์ด๋ฏธ์ง€ ๋ถ„๋ฅ˜ํ•˜๊ธฐ

์บ๊ธ€์˜ Chest X-Ray Images ์‚ฌ์šฉํ๋ ด์€ ํ์— ์—ผ์ฆ์ด ์ƒ๊ธด ์ƒํƒœ๋กœ ์ค‘์ฆ์˜ ํ˜ธํก๊ธฐ ๊ฐ์—ผ๋ณ‘์ž…๋‹ˆ๋‹ค.์•„๋ž˜ ์ด๋ฏธ์ง€์ฒ˜๋Ÿผ ํ๋ถ€์œ„์— ํฌ๋ฏธํ•œ ๊ทธ๋ฆผ์ž?๊ฐ™์€๊ฒŒ ๋ณด์ด๋Š”๋ฐ ์‚ฌ์‹ค ์ด ์‚ฌ์ง„๋งŒ์œผ๋กœ ํ™•์‹คํžˆ ํ๋ ด์ด๋‹ค ์•„๋‹ˆ๋‹ค

velog.io

 

 

 

 

1. ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ import ๋ฐ ๋ฐ์ดํ„ฐ load

import os, re
import random, math
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
# ๋ฐ์ดํ„ฐ ๋กœ๋“œ ๋น ๋ฅด๊ฒŒ
AUTOTUNE = tf.data.experimental.AUTOTUNE

# X-ray ์ด๋ฏธ์ง€ ์‚ฌ์ด์ฆˆ
IMAGE_SIZE = [180, 180]
  • ๋ฐ์ดํ„ฐ ๊ฒฝ๋กœ
root_path = os.getcwd() + '\\'
train_path = root_path + "chest_xray\\train\\*\\*"
val_path = root_path + "chest_xray\\val\\*\\*"
test_path = root_path + "chest_xray\\test\\*\\*"
BATCH_SIZE = 30     # ์ •ํ•œ ๋งŒํผ์˜ ๋ฐฐ์น˜
EPOCHS = 50

 

 

 

 

2. ๋ฐ์ดํ„ฐ ์ค€๋น„

  • train_data : 5216
  • test_data : 624
  • val_data : 16
train_data = tf.io.gfile.glob(train_path)
test_data = tf.io.gfile.glob(test_path)
val_data = tf.io.gfile.glob(val_path)

print(len(train_data))
print(len(test_data))
print(len(val_data))
  • val ๋„ˆ๋ฌด ์ž‘์œผ๋‹ˆ train ๊ณผ val ํ•ฉ์น˜๊ณ  ๋‹ค์‹œ 8 : 2
train_val_sum = tf.io.gfile.glob(train_path)
train_val_sum.extend(tf.io.gfile.glob(val_path))

# 8:2
train_size = math.floor(len(train_val_sum)*0.8)
random.shuffle(train_val_sum)   # shuffle : ๊ณ ์ • ํฌ๊ธฐ ๋ฒ„ํผ๋ฅผ ์œ ์ง€ํ•˜๊ณ  ๋ฎค์ž‘์œ„๋กœ ๊ท ์ผ
train = train_val_sum[:train_size]
val = train_val_sum[train_size:]

print(len(train))
print(len(val))
  • train : 4185
  • val : 1047

 

  • ํ๋ ด ์•„๋‹Œ normal : 1077
  • ํ๋ ด : 3108
normal = len([filename for filename in train if "NORMAL" in filename])
print(f"normal image count in train set : {normal}")

pneumonia = len([filename for filename in train if "PNEUMONIA" in filename])
print(f"pneumonia image count in train set : {pneumonia}")

 

  • tf.data.Dataset.from_tensor_slices ํ•จ์ˆ˜๋Š” tf.data.Dataset ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ํ•จ์ˆ˜
train_list_ds = tf.data.Dataset.from_tensor_slices(train)
val_list_ds = tf.data.Dataset.from_tensor_slices(val)
TRAIN_IMG_COUNT = tf.data.experimental.cardinality(train_list_ds).numpy()
print(f"Training images count: {TRAIN_IMG_COUNT}")

VAL_IMG_COUNT = tf.data.experimental.cardinality(val_list_ds).numpy()
print(f"Validating images count: {VAL_IMG_COUNT}")

 

  • 'NORMAL'์ด๋‚˜ 'PNEUMONIA'๊ฐ€ ํฌํ•จ๋˜์–ด ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ์ด๋ฅผ ์ด์šฉํ•ด์„œ ๋ผ๋ฒจ ๋ฐ์ดํ„ฐ
# ํŒŒ์ผ ๊ฒฝ๋กœ์˜ ๋์—์„œ ๋‘๋ฒˆ์งธ ๋ถ€๋ถ„์œผ๋กœ ์–‘์„ฑ๊ณผ ์Œ์„ฑ์„ ๊ตฌ๋ถ„
def get_label(file_path):
    parts = tf.strings.split(file_path, os.path.sep)
    return parts[-2] == "PNEUMONIA"   # ํ๋ ด์ด๋ฉด ์–‘์„ฑ(True), ๋…ธ๋ง์ด๋ฉด ์Œ์„ฑ(False)

 

  • ์ด๋ฏธ์ง€ ์‚ฌ์ด์ฆˆ ์ค„์ด๊ธฐ
def decode_img(img):
    img = tf.image.decode_jpeg(img, channels=3) # ์ด๋ฏธ์ง€๋ฅผ uint8 tensor๋กœ ์ˆ˜์ •
    img = tf.image.convert_image_dtype(img, tf.float32) # float32 ํƒ€์ž…์œผ๋กœ ์ˆ˜์ •
    img = tf.image.resize(img, IMAGE_SIZE) # ์ด๋ฏธ์ง€ ์‚ฌ์ด์ฆˆ๋ฅผ [180, 180] ์ˆ˜์ •
    return img
  •  ์ด๋ฏธ์ง€, ๋ผ๋ฒจ ์ฝ๊ธฐ
def process_path(file_path):
    label = get_label(file_path) # ๋ผ๋ฒจ ๊ฒ€์ถœ
    img = tf.io.read_file(file_path) # ์ด๋ฏธ์ง€ ์ฝ๊ธฐ
    img = decode_img(img) # ์ด๋ฏธ์ง€๋ฅผ ์•Œ๋งž์€ ํ˜•์‹์œผ๋กœ ์ˆ˜์ •
    return img, label

 

  • train, validation ๋ฐ์ดํ„ฐ ์…‹ ๋งŒ๋“ค๊ธฐ
# ๋น ๋ฅด๊ฒŒ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
val_ds = val_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)

 

  • ์ด๋ฏธ์ง€ ํ™•์ธ
for image, label in train_ds.take(1):
    print("Image shape: ", image.numpy().shape)
    print("Label: ", label.numpy())

 

  • test ๋„ ๋™์ผํ•˜๊ฒŒ
test_list_ds = tf.data.Dataset.list_files(test_path)
TEST_IMAGE_COUNT = tf.data.experimental.cardinality(test_list_ds).numpy()
test_ds = test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE)

for image, label in test_ds.take(1):
    print("Image shape: ", image.numpy().shape)
    print("Label: ", label.numpy())
print(TEST_IMAGE_COUNT)

 

  • data ๋ฅผ ์ข€๋” ํšจ์œจ์ ์œผ๋กœ
# random_flip_left_right : ๋žœ๋คํ•˜๊ฒŒ ์ขŒ์šฐ๋ฅผ ๋ฐ˜์ „
def augment(image,label):
    image = tf.image.random_flip_left_right(image)
    return image,label
def prepare_for_training(ds, shuffle_buffer_size=1000):
    ds = ds.map(
            augment,       # augment ํ•จ์ˆ˜ ์ ์šฉ
            num_parallel_calls=2
        )
    # shuffle : ๊ณ ์ • ํฌ๊ธฐ ๋ฒ„ํผ๋ฅผ ์œ ์ง€ํ•˜๊ณ  ๋ฎค์ž‘์œ„๋กœ ๊ท ์ผํ•˜๊ฒŒ ๋‹ค์Œ ์š”์†Œ๋ฅผ ์„ ํƒ
    ds = ds.shuffle(buffer_size=shuffle_buffer_size)
    # repeat : ์—ฌ๋Ÿฌ ๋ฒˆ ๋ฐ์ดํ„ฐ ๋ถ€๋ฅด๊ธฐ
    ds = ds.repeat()
    # batch : ์ •ํ•œ ๋งŒํผ์˜ ๋ฐฐ์น˜
    ds = ds.batch(BATCH_SIZE)
    # prefech : GPU์™€ CPU๋ฅผ ํšจ์œจ์ ์œผ๋กœ ์‚ฌ์šฉ
    ds = ds.prefetch(buffer_size=AUTOTUNE)

    return ds
train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)

 

 

 

 

 

3. ๋ฐ์ดํ„ฐ ์‹œ๊ฐํ™”

# ์ด๋ฏธ์ง€ ๋ฐฐ์น˜๋ฅผ ์ž…๋ ฅํ•˜๋ฉด ์—ฌ๋Ÿฌ์žฅ์˜ ์ด๋ฏธ์ง€๋ฅผ ๋ณด์—ฌ์คŒ
def show_batch(image_batch, label_batch):
    plt.figure(figsize=(10,10))
    for n in range(BATCH_SIZE):     # BATCH_SIZE = 30
        ax = plt.subplot(5,math.ceil(BATCH_SIZE / 5),n+1)
        plt.imshow(image_batch[n])
        if label_batch[n]:
            plt.title("PNEUMONIA")
        else:
            plt.title("NORMAL")
        plt.axis("off")
image_batch, label_batch = next(iter(train_ds))
show_batch(image_batch.numpy(), label_batch.numpy())

 

 

 

 

4. CNN ๋ชจ๋ธ๋ง

: CNN์€ Convolution Layer์™€ Max Pooling ๋ ˆ์ด์–ด๋ฅผ ๋ฐ˜๋ณต์ ์œผ๋กœ stack์„ ์Œ“๋Š” ํŠน์ง• ์ถ”์ถœ(Feature Extraction)

: Fully Connected Layer๋ฅผ ๊ตฌ์„ฑํ•˜๊ณ  ๋งˆ์ง€๋ง‰ ์ถœ๋ ฅ์ธต์— Softmax๋ฅผ ์ ์šฉํ•œ ๋ถ„๋ฅ˜ ๋ถ€๋ถ„์œผ๋กœ ๋‚˜๋‰œ๋‹ค

  • Convolution Block
1) conv 2 ๋ฒˆ
2) batch normal ๋กœ ๊ณผ์ ํ•ฉ ๋ฐฉ์ง€
3) max pooling
def conv_block(filters):
    block = tf.keras.Sequential([
        tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
        tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D()
    ])
    
    return block
  • dense block
def dense_block(units, dropout_rate):
    block = tf.keras.Sequential([
        tf.keras.layers.Dense(units, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(dropout_rate)
    ])
    
    return block
  • ์ „์ฒด ๋ชจ๋ธ
def build_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
        
        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.MaxPool2D(),
        
        conv_block(32),
        conv_block(64),
        
        conv_block(128),
        tf.keras.layers.Dropout(0.2),
        
        conv_block(256),
        tf.keras.layers.Dropout(0.2),
        
        tf.keras.layers.Flatten(),
        dense_block(512, 0.7),
        dense_block(128, 0.5),
        dense_block(64, 0.3),
        
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    return model

 

 

 

 

 

5. ๋ฐ์ดํ„ฐ ๋ถˆ๊ท ํ˜• ์ฒ˜๋ฆฌ

  • ๋žœ๋ค ํฌ๋ ˆ์ŠคํŠธ๊ฐ€ ์ข‹๋‹ค
weight_for_0 = (1 / normal)*(TRAIN_IMG_COUNT)/2.0 
weight_for_1 = (1 / pneumonia)*(TRAIN_IMG_COUNT)/2.0

class_weight = {0: weight_for_0, 1: weight_for_1}

print('Weight for NORMAL: {:.2f}'.format(weight_for_0))
print('Weight for PNEUMONIA: {:.2f}'.format(weight_for_1))

 

 

 

 

6. ๋ชจ๋ธ ํ›ˆ๋ จ

with tf.device('/GPU:0'):
    model = build_model()

    METRICS = [
        'accuracy',
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall')
    ]
    
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=METRICS
    )
  • EarlyStopping
from keras.callbacks import EarlyStopping
# ์ตœ๊ณ ์˜ ์ •ํ™•๋„๋ฅผ ๊ฐ€์งˆ๋•Œ ๋ฉˆ์ถฐ์ฃผ๋Š” ํ•จ์ˆ˜ ์ถ”๊ฐ€
es=EarlyStopping(monitor='val_loss',mode='min',verbose=1,patience=10)
history = model.fit(
    train_ds,
    steps_per_epoch = TRAIN_IMG_COUNT // BATCH_SIZE,
    epochs=10, callbacks=[es],
    validation_data = val_ds,
    validation_steps = VAL_IMG_COUNT // BATCH_SIZE,
    class_weight = class_weight,
)

epoch = 10.. ๋„˜ ๋Š๋ ค

 

 

 

 

 

7. ์‹œ๊ฐํ™”

fig, ax = plt.subplots(1, 4, figsize=(20, 5))
ax = ax.ravel()

for i, met in enumerate(['precision', 'recall', 'accuracy', 'loss']):
    ax[i].plot(history.history[met])
    ax[i].plot(history.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

  • test ๋ฐ์ดํ„ฐ๋กœ ๋ชจ๋ธ ํ‰๊ฐ€
from IPython.display import Image
from IPython.core.display import HTML
Image(url= "https://miro.medium.com/proxy/1*pOtBHai4jFd-ujaNXPilRg.png")

Precision ( ์ •๋ฐ€๋„ ) ex) ํ™•์ง„์ž๋กœ ๋ถ„๋ฅ˜๋œ ์‚ฌ๋žŒ๋“ค ์ค‘ ์‹ค์ œ ์–‘์„ฑ ์‹œ๋ฏธ๋‹ ํ™•๋ฅ 
Recall ( ์žฌํ˜„์œจ ) ex) ์‹ค์ œ๋กœ ์–‘์„ฑ์ธ ์‹œ๋ฏผ์„ ํ™•์ง„์ž๋กœ ๋ถ„๋ฅ˜ํ•  ํ™•๋ฅ 
loss, accuracy, precision, recall = model.evaluate(test_ds)
print(f'Loss: {loss},\nAccuracy: {accuracy},\nPrecision: {precision},\nRecall: {recall}')
  • Precision ๊ณผ Recall ์€ trade-off ๊ด€๊ณ„

 

 

728x90
๋ฐ˜์‘ํ˜•
Comments