본문 바로가기

AI/NLP

Relation Extraction Pytorch Lightning Refactoring

Pytorch Lightning > Docs > LightningModule

 

LightningModule — PyTorch Lightning 1.9.0dev documentation

LightningModule A LightningModule organizes your PyTorch code into 6 sections: Computations (init). Train Loop (training_step) Validation Loop (validation_step) Test Loop (test_step) Prediction Loop (predict_step) Optimizers and LR Schedulers (configure_op

pytorch-lightning.readthedocs.io

model.py

from importlib import import_module

import numpy as np
import pytorch_lightning as pl
import torch
import transformers

from utils import criterion_entrypoint, klue_re_auprc, klue_re_micro_f1, n_compute_metrics

class Model(pl.LightningModule):
    def __init__(self, config):
        super().__init__()
        self.save_hyperparameters()

        self.model_name = config.model.model_name
        self.lr = config.train.learning_rate
        self.lr_sch_use = config.train.lr_sch_use # lr scheduler 사용여부
        self.lr_decay_step = config.train.lr_decay_step
        self.scheduler_name = config.train.scheduler_name
        self.lr_weight_decay = config.train.lr_weight_decay

        # 사용할 모델을 호출합니다.
        self.plm = transformers.AutoModelForSequenceClassification.from_pretrained(
            pretrained_model_name_or_path=self.model_name, num_labels=30
        )
        # Loss 계산을 위해 사용될 CE Loss를 호출합니다.
        self.loss_func = criterion_entrypoint(config.train.loss_name)
        self.optimizer_name = config.train.optimizer_name

		def forward(self, x):
        x = self.plm(
            input_ids=x["input_ids"], # sequence token 입력
            attention_mask=x["attention_mask"], # [0,1]로 구성된 마스크이며 패딩 토큰을 구분
            token_type_ids=x["token_type_ids"], # [0,1]로 구성되었으며 입력의 첫 문장과 두번째 문장 구분
        )
        return x["logits"]
		
    def training_step(self, batch, batch_idx):
				# trainer.fit()의 학습 과정
        x = batch
        y = batch["labels"]

        logits = self(x) # forward 하여 logits 값 예측
        loss = self.loss_func(logits, y.long()) # loss 계산하여 학습

        f1, accuracy = n_compute_metrics(logits, y).values() # metric 계산
        self.log("train", {"loss": loss, "f1": f1, "accuracy": accuracy}) # logging

        return loss

    def validation_step(self, batch, batch_idx):
				# trainer.fit()의 validation 과정
        x = batch
        y = batch["labels"]

        logits = self(x)
        loss = self.loss_func(logits, y.long())

        f1, accuracy = n_compute_metrics(logits, y).values() # f1, accuracy 계산
        self.log("val_loss", loss)
        self.log("val_accuracy", accuracy)
        self.log("val_f1", f1, on_step=True)

        return {"logits": logits, "y": y} # logits값과 y를 validation_step 종료시 return

    def validation_epoch_end(self, outputs):
				# validation_step의 return값을 붙여 logit과 y값 구함 
        logits = torch.cat([x["logits"] for x in outputs])  
        y = torch.cat([x["y"] for x in outputs])
				
				# 함수에 보내기 위해 CPU로 이동
        logits = logits.detach().cpu().numpy()
        y = y.detach().cpu()
				
        auprc = klue_re_auprc(logits, y) # auprc 계산
        self.log("val_auprc", auprc)

    def test_step(self, batch, batch_idx):
				# trainer.test()를 통해 가장 좋은 성능을 저장하기 위한 함수
        x = batch
        y = batch["labels"]

        logits = self(x)

        f1, accuracy = n_compute_metrics(logits, y).values()
        self.log("test_f1", f1)

        return {"logits": logits, "y": y}

    def test_epoch_end(self, outputs):
        logits = torch.cat([x["logits"] for x in outputs])
        y = torch.cat([x["y"] for x in outputs])

        logits = logits.detach().cpu().numpy()
        y = y.detach().cpu()

        auprc = klue_re_auprc(logits, y)
        self.log("test_auprc", auprc)

    def predict_step(self, batch, batch_idx):
				# inference를 위한 함수
        logits = self(batch)

        return logits # (batch,steps) size tensor

    def configure_optimizers(self):
				# optimeizer, lr_schedueler 설정을 위한 함수
        opt_module = getattr(import_module("torch.optim"), self.optimizer_name) # torch.optim.optimizer_name 형태로 불러옴
        if self.lr_weight_decay: # weight decay 사용할 경우
            optimizer = opt_module(filter(lambda p: p.requires_grad, self.parameters()), 
						lr=self.lr, 
						weight_decay=0.01)
        else: # weight decay 사용하지 않을 경우
            optimizer = opt_module(
                filter(lambda p: p.requires_grad, self.parameters()),
                lr=self.lr
            )
        if self.lr_sch_use: # lr_schedueler 사용할 경우
            t_total = 2030 * 7  # train_dataloader len, epochs
            warmup_step = int(t_total * 0.1)
            _scheduler_dic = {
                "StepLR": torch.optim.lr_scheduler.StepLR(optimizer, self.lr_decay_step, gamma=0.5),
                "ReduceLROnPlateau": torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=10),
                "CosineAnnealingLR": torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=2, eta_min=0.0),
                "constant_warmup": transformers.get_constant_schedule_with_warmup(optimizer, 100),
                "cosine_warmup": transformers.get_cosine_schedule_with_warmup(
                    optimizer, num_warmup_steps=10, num_training_steps=t_total
                ),
            }
            scheduler = _scheduler_dic[self.scheduler_name]

            return [optimizer], [scheduler]
        else: # lr_schedueler 사용하지 않을 경우
            return optimizer

data.py

import pickle as pickle

import pandas as pd
import pytorch_lightning as pl
import torch
import transformers

from sklearn.model_selection import StratifiedKFold
from tqdm.auto import tqdm
from utils import *

class Dataset(torch.utils.data.Dataset):
    """Dataset 구성을 위한 Class"""

    def __init__(self, pair_dataset, labels):
        self.pair_dataset = pair_dataset
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.pair_dataset.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

class Dataloader(pl.LightningDataModule):
    def __init__(self, model_name, batch_size, shuffle, train_path, test_path, split_seed=42):
        super().__init__()
				# config 파일에서 불러올 값들 저장
        self.model_name = model_name
        self.batch_size = batch_size 
        self.shuffle = shuffle
        self.split_seed = split_seed

        self.train_path = train_path
        self.test_path = test_path

        self.train_dataset = None
        self.val_dataset = None
        self.test_dataset = None
        self.predict_dataset = None

        self.tokenizer = transformers.AutoTokenizer.from_pretrained(model_name, max_length=200)

    def setup(self, stage="fit"):
        if stage == "fit":
            # 학습 데이터 호출
            total_data = load_data(self.train_path)
			
            # 9:1 비율로 train, validation을 분리
            train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
            val_data = total_data.drop(train_data.index)
						# 라벨값을 숫자로 변경
            train_label = label_to_num(train_data["label"].values)
            val_label = label_to_num(val_data["label"].values)
						# 데이터셋을 토큰화
            tokenized_train = tokenized_dataset(train_data, self.tokenizer)
            tokenized_val = tokenized_dataset(val_data, self.tokenizer)
						# 토큰화된 Dataset을 학습할 수 있도록 저장
            self.train_dataset = Dataset(tokenized_train, train_label)
            self.val_dataset = Dataset(tokenized_val, val_label)

        if stage == "test":
						# best 모델 저장을 위한 test 데이터 호출
            total_data = load_data(self.train_path)

            train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
            val_data = total_data.drop(train_data.index)

            val_label = label_to_num(val_data["label"].values)
            tokenized_val = tokenized_dataset(val_data, self.tokenizer)

            self.test_dataset = Dataset(tokenized_val, val_label)

        if stage == "predict":
						# inference를 위한 predict 데이터 호출
            p_data = load_data(self.test_path) 
            p_label = list(map(int, p_data["label"].values)) # DataFrame에서 label 값 리스트 형태로 불러옴
            tokenized_p = tokenized_dataset(p_data, self.tokenizer) # 토큰화

            self.predict_dataset = Dataset(tokenized_p, p_label) # Dataset 형태로 저장
		def train_dataloader(self):
		        return torch.utils.data.DataLoader(self.train_dataset, batch_size=self.batch_size)
		
    def val_dataloader(self):
        return torch.utils.data.DataLoader(self.val_dataset, batch_size=self.batch_size)

    def test_dataloader(self):
        return torch.utils.data.DataLoader(self.test_dataset, batch_size=self.batch_size)
    
    def predict_dataloader(self):
        return torch.utils.data.DataLoader(self.predict_dataset, batch_size=self.batch_size, num_workers=4)

main.py

import argparse
import os
import re

from datetime import datetime, timedelta

import torch
import wandb

from data_n import *
from model import *
from omegaconf import OmegaConf 
from pytorch_lightning.callbacks import ModelCheckpoint, RichProgressBar
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.loggers import WandbLogger

time_ = datetime.now() + timedelta(hours=9)
time_now = time_.strftime("%m%d%H%M")

wandb_dict = {
    "users": "key"
}

if __name__ == "__main__":
    # 하이퍼 파라미터 등 각종 설정값을 입력
    # 터미널 실행 예시 : python3 run.py --batch_size=64 ...
    # 실행 시 '--batch_size=64' 같은 인자를 입력하지 않으면 default 값이 기본으로 실행됩니다
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", type=str, default="base_config")
    args, _ = parser.parse_known_args()
    cfg = OmegaConf.load(f"/opt/ml/code/pl/config/{args.config}.yaml") # config 파일 사용
		# wandb 기록
    wandb.login(key=wandb_dict[cfg.wandb.wandb_username])
    model_name_ch = re.sub("/", "_", cfg.model.model_name)
    wandb_logger = WandbLogger(
        log_model="all",
        name=f"{cfg.model.saved_name}_{cfg.train.batch_size}_{cfg.train.learning_rate}_{time_now}",
        project=cfg.wandb.wandb_project,
        entity=cfg.wandb.wandb_entity,
    )
	
    pl.seed_everything(cfg.train.seed, workers=True)     # seed 고정

    ck_dir_path = f"/opt/ml/code/pl/checkpoint/{model_name_ch}"
    if not os.path.exists(ck_dir_path):
        os.makedirs(ck_dir_path)

    # Checkpoint
    checkpoint_callback = ModelCheckpoint(
        dirpath=ck_dir_path, filename="{epoch}_{val_loss:.4f}", monitor="val_f1", save_top_k=1, mode="max"
    )

    # Earlystopping
    earlystopping = EarlyStopping(monitor="val_f1", patience=3, mode="max")

    # dataloader와 model을 생성합니다.
    dataloader = Dataloader(
        cfg.model.model_name,
        cfg.train.batch_size,
        cfg.data.shuffle,
        cfg.path.train_path,
        cfg.path.test_path,
        cfg.train.seed,
    )
    model = Model(cfg)

    # gpu가 없으면 'gpus=0'을, gpu가 여러개면 'gpus=4'처럼 사용하실 gpu의 개수를 입력해주세요
    trainer = pl.Trainer(
        precision=16,	    # Mixed precision(FP16)
        accelerator="gpu",
        devices=1,
        max_epochs=cfg.train.max_epoch,
        log_every_n_steps=cfg.train.logging_step,
        logger=wandb_logger,  # W&B integration
        
        callbacks=[earlystopping, checkpoint_callback, RichProgressBar()], # RichProgressBar() output 형태 바꾸기
        deterministic=True, 
				# 테스트를 위한 배치 사이즈 제한
        # limit_train_batches=0.15,  # use only 15% of training data
        # limit_val_batches = 0.01, # use only 1% of val data
        # limit_train_batches=10    # use only 10 batches of training data
    )
		# 학습
    trainer.fit(model=model, datamodule=dataloader)
    # test시 ckpt 중 가장 성능이 좋은 ckpt를 가지고 옵니다.
    trainer.test(model=model, datamodule=dataloader, ckpt_path="best")	

    # 학습이 완료된 모델을 저장합니다.
    output_dir_path = "output"
    if not os.path.exists(output_dir_path):
        os.makedirs(output_dir_path)

    output_path = os.path.join(output_dir_path, f"{model_name_ch}_{time_now}_model.pt")
    torch.save(model.state_dict(), output_path)
  • pl.seed_everything과 deterministic을 사용하여 실험 환경 재현 보장
  • limit_train_batches을 사용하여 빠른 테스트를 지원