Data Science/FullStackDeepLearning

[FSDL] Pre-Lab 02a: PyTorch Lightning

김 기승 2023. 6. 23. 11:58

Why Lightning?

  • PyTorch는 하드웨어 가속을 통해 미분 텐서 연산을 실행하는 강력한 라이브러리이며, 많은 신경망 primitives를 포함하고 있지만 "훈련"이라는 개념은 없다.
  • 높은 수준에서 보면 nn.Module은 그라데이션이 있는 상태 저장 함수이고 torch.optim.Optimizer는 gradients을 사용하여 해당 상태를 업데이트할 수 있지만, 데이터에서 이러한 gradients을 반복적으로 생성하기 위한 사전 구축된 도구는 PyTorch에 없다.
  • 그래서 우리는 "training loop"를 DataLoader를 반복함으로써 구현하였다.
  • 그러나, 자체 DataLoader를 사용하여 validation 및 test 데이터에서 모델을 실행하고 싶을 것이다.
  • 완료되면 모델을 저장해야 하며, 장기간 실행되는 작업의 경우 충돌이 발생했을 때 다시 시작할 수 있도록 학습 프로세스의 체크포인트를 저장해야 한다.
  • 여러 도메인에서 최첨단 모델 성능을 발휘하려면 여러 노드/머신과 해당 노드 내의 여러 GPU에 걸쳐 학습을 분산해야 한다.
  • 이러한 모든 기능이 지금 작성 중인 모델뿐만 아니라 많은 모델과 데이터 세트에서 작동하기를 원할 것이다.
  • 이러한 고민들을 해결하기 위한 방법으로 “프레임워크”를 사용하는 것이며, PyTorch Lightning은 PyTorch를 기반으로 하는 인기 있는 프레임워크이다.
    • pl.Trainer 클래스
      • training, validation, test loop를 구성하고 실행
    • pl.LightningModule 클래스
      • optimizer을 모델에 연결하고 training, validation, testing 중에 모델이 어떻게 작동할 지를 정의
  • 이 두 가지 모두 최첨단 딥러닝 코드베이스에 필요한 모든 기능을 갖추고 있다.
    • device types 전환을 위한 flag 및 분산 컴퓨팅 전략
    • saving, checkpointing, resumption
    • metrics의 계산 및 로깅
    • etc.
  • 이는 프레임워크가 설계되는 방식인 하위 수준 라이브러리(여기서는 PyTorch)를 통해 추상화를 제공하는 것과는 반대되는 방식이다.
  • "추상화하지 말고 구성하라"는 스타일 때문에 PyTorch Lightning 코드를 작성할 때는 메서드 오버라이딩을 많이 수행해야 한다.
  • 즉, 클래스로부터 상속한 다음 configuration arguments를 사용하여 이미 완전히 정의된 클래스를 제공하는 Lightning 대신 코드에 필요한 일반 메서드의 특정 버전을 구현해야 한다.

The pl.LightningModule

  • 두 가지 핵심 클래스 중 하나인 LightningModule은 모든 Module 기능을 상속하지만 더 많은 기능을 추가한 torch.nn.Module과 같다.
  • 이전에는 많은 세부 사항을 하드 코딩했지만, 이 클래스의 이점은 세부 사항을 설정할 수 있다는 것이다.
  • 현실적인 예시를 위해 코드베이스에서 사용하는 BaseLitModel의 실제 코드와 비교하면서 진행 할 것이다.
  • pl.LightningModuletorch.nn.Module이므로 기본 정의는 __init__forward 메서드가 필요하다.
class LinearRegression(pl.LightningModule):

    def __init__(self):
        super().__init__()  # just like in torch.nn.Module,
        			# we need to call the parent class __init__

        # attach torch.nn.Modules as top level attributes during init,
        # just like in a torch.nn.Module
        self.model = torch.nn.Linear(in_features=1, out_features=1)
        # we like to define the entire model as one torch.nn.Module 
        # -- typically in a separate class

    # optionally, define a forward method
    def forward(self, xs):
        return self.model(xs)  # we like to just call the model's forward method
  • 여기에 .training_step.configure_optimizers 메서드를 추가로 구현해야만 한다.

.training_step

  • training_step 메서드는 한 단계의 훈련 중에 수행해야 할 작업을 정의한다.
  • 효과적으로 배치를 loss 값에 매핑하여 PyTorch가 해당 loss를 백그라운드로 처리할 수 있도록 한다.
from typing import Tuple

def training_step(
	self: pl.LightningModule,
    batch: Tuple[torch.Tensor,torch.Tensor],
    batch_idx: int
    ) -> torch.Tensor:
    
    xs, ys = batch  # unpack the batch
    outs = self(xs)  # apply the model
    loss = torch.nn.functional.mse_loss(outs, ys)  # compute the (squared error) loss
    return loss

LinearRegression.training_step = training_step
  • device나 기타 Tensor metadata에 대해서는 Lightning에서 알아서 처리한다.
  • validation 및 test 루프 중에 모델의 동작을 정의하기 위해 validation_steptest_step을 추가로 정의할 수 있다.
  • 이 단계에서는 정확도, 정밀도, 재현율과 같이 미분 불가한 metric과 같이 입력, 출력 및 loss와 관련된 다른 값을 계산할 수도 있다.
  • 따라서 BaseLitModel에는 약간 더 복잡한 training_step 메서드가 있으며, forward의 세부 사항은 대신 ._run_on_batch로 넘어가게 된다.
# BaseLitModel.py

def training_step(self, batch, batch_idx):
        x, y, logits, loss = self._run_on_batch(batch)
        self.train_acc(logits, y)

        self.log("train/loss", loss)
        self.log("train/acc", self.train_acc, on_step=False, on_epoch=True)

        outputs = {"loss": loss}

        return outputs

.configure_optimizers

  • training_step 덕분에 손실이 생겼고, PyTorch는 이를 gradient로 바꿀 수 있다.
  • 파라미터를 업데이트하기 위해 gradient를 사용할 수 있는 optimizer가 필요하다.
    • 복잡한 경우에는 둘 이상의 옵티마이저(e.g. GAN)가 필요할 수 있다.
  • 두 번째로 필요한 메서드인 .configure_optimizerstorch.optim.Optimizers를 설정한다.
    • e.g. 하이퍼파라미터를 설정하고 모듈의 파라미터를 가리키도록 설정
  • LinearRegression 모델의 경우 optimizer를 인스턴스화하고 모델의 파라미터를 가리키기만 하면 된다.
def configure_optimizers(self: LinearRegression) -> torch.optim.Optimizer:
    optimizer = torch.optim.Adam(self.parameters(), lr=3e-4)  # https://fsdl.me/ol-reliable-img
    return optimizer

LinearRegression.configure_optimizers = configure_optimizers
  • 기본 동작에 의존하지 않고 수동으로 최적화를 제어하는 방법을 포함하여, Lightning의 최적화에 대한 자세한 내용은 문서에서 확인할 수 있다.
  • BaseLitModelconfigure_optimizers 메서드는 그다지 복잡하지 않고, learning rate 스케줄러에 대한 지원만 추가하면 된다.
def configure_optimizers(self):
        optimizer1 = Adam(...)
        optimizer2 = SGD(...)
        scheduler1 = ReduceLROnPlateau(optimizer1, ...)
        scheduler2 = LambdaLR(optimizer2, ...)
        return (
            {
                "optimizer": optimizer1,
                "lr_scheduler": {
                    "scheduler": scheduler1,
                    "monitor": "metric_to_track",
                },
            },
            {"optimizer": optimizer2, "lr_scheduler": scheduler2},
        )

The pl.Trainer

  • training, validation 및 testing을 실행하기 위해 LightningModule 인터페이스에 의존하는 Trainer와 결합이 필요하다.
  • Trainer에서는 train을 얼마나 길게 할 것인지(max_epochs, min_epochs, max_time, max_steps), 사용할 acceleration(e.g. GPU) 또는 배포 전략, training 실행마다 다를 수 있는 기타 설정 등을 선택할 수 있다.
trainer = pl.Trainer(max_epochs=20, gpus=int(torch.cuda.is_available()))
  • 그전에 torch.utils.data.DataLoader가 필요하다.
class CorrelatedDataset(torch.utils.data.Dataset):

    def __init__(self, N=10_000):
        self.N = N
        self.xs = torch.randn(size=(N, 1))
        self.ys = torch.randn_like(self.xs) + self.xs  # correlated target data: y ~ N(x, 1)

    def __getitem__(self, idx):
        return (self.xs[idx], self.ys[idx])

    def __len__(self):
        return self.N

dataset = CorrelatedDataset()
tdl = torch.utils.data.DataLoader(dataset, batch_size=32, num_workers=1)
  • (PyTorch Lightning로 인해 추가된 내용은 없다.)
  • 해당 데이터는 대략 이러한 분포를 가지고 있다.

model = LinearRegression()

print("loss before training:", torch.mean(torch.square(model(dataset.xs) - dataset.ys)).item())

trainer.fit(model=model, train_dataloaders=tdl)

print("loss after training:", torch.mean(torch.square(model(dataset.xs) - dataset.ys)).item())
  • 학습 후 손실이 학습 전 손실보다 작아야 하며, 모델의 예측이 데이터와 일치하는 것을 확인할 수 있었다.

  • Trainer는 "Customize every aspect of training via flags"을 보장한다.
    • 문서를 보면 이외의 사용자 정의 옵션을 확인 할 수 있다.

Training with PyTorch Lightning in the FSDL Codebase

import training.run_experiment

print(training.run_experiment.__doc__, training.run_experiment.main.__doc__)
Experiment-running framework. 
    Run an experiment.

    Sample command:
    ```
    python training/run_experiment.py --max_epochs=3 --gpus='0,' --num_workers=20 --model_class=MLP --data_class=MNIST
    ```

    For basic help documentation, run the command
    ```
    python training/run_experiment.py --help
    ```

    The available command line args differ depending on some of the arguments, including --model_class and --data_class.

    To see which command line args are available and read their documentation, provide values for those arguments
    before invoking --help, like so:
    ```
    python training/run_experiment.py --model_class=MLP --data_class=MNIST --help
  • Trainer 초기화
# how the trainer is initialized in the training script
!grep "pl.Trainer.from" training/run_experiment.py

 

  • Trainer의 모든 구성 flexibility와 complexity를 명령어를 통해 사용할 수 있다.
  • Trainer의 명령어 arguments에 대한 문서는 --help로 액세스할 수 있다.
# displays the first few flags for controlling the Trainer from the command line
!python training/run_experiment.py --help | grep "pl.Trainer" -A 24

 

Extra Goodies

  • LightningModuleTrainer는 PyTorch Lightning을 시작하는 데 필요한 최소한의 준비물이다.
  • Lightning과 그 생태계에는 더 많은 기능이 내장되어 있다.
    • pl.LightningDataModules
      • 데이터로더를 구성하고 분산 설정에서 데이터를 처리
    • pl.Callbacks
      • 모델 훈련에 "선택적" 추가 기능을 추가
    • torchmetrics
      • 효율적인 계산 및 로깅

pl.LightningDataModule

  • LightningModule이 모델과 옵티마이저를 구성하는 반면,
  • LightningDataModule은 데이터 로딩 코드를 구성한다.
  • class-level 문서에는 클래스의 개념이 잘 설명되어 있으며 재정의할 주요 메서드가 나열되어 있다.
A DataModule standardizes the training, val, test splits, data preparation and transforms. The main
    advantage is consistent data splits, data preparation and transforms across models.

    Example::

        class MyDataModule(LightningDataModule):
            def __init__(self):
                super().__init__()
            def prepare_data(self):
                # download, split, etc...
                # only called on 1 GPU/TPU in distributed
            def setup(self, stage):
                # make assignments here (val/train/test split)
                # called on every process in DDP
            def train_dataloader(self):
                train_split = Dataset(...)
                return DataLoader(train_split)
            def val_dataloader(self):
                val_split = Dataset(...)
                return DataLoader(val_split)
            def test_dataloader(self):
                test_split = Dataset(...)
                return DataLoader(test_split)
            def teardown(self):
                # clean up after fit or test
                # called on every process in DDP
  • CorrelatedDatasetLightningDataModule을 사용해서 재정의 해보자.
import math

class CorrelatedDataModule(pl.LightningDataModule):

    def __init__(self, size=10_000, train_frac=0.8, batch_size=32):
        super().__init__()  # again, mandatory superclass init, as with torch.nn.Modules

        # set some constants, like the train/val split
        self.size = size
        self.train_frac, self.val_frac = train_frac, 1 - train_frac
        self.train_indices = list(range(math.floor(self.size * train_frac)))
        self.val_indices = list(range(self.train_indices[-1], self.size))

        # under the hood, we've still got a torch Dataset
        self.dataset = CorrelatedDataset(N=size)
  • LightningDataModule은 신중하게 처리되어야 할, 상태를 설정하는 연산(e.g. 디스크에 쓰기 또는 나중에 접근하려는 것을 self에 첨부)을 분산 설정에서 작동하도록 설계되었다.
  • 학습을 위해 데이터를 준비하는 것은 종종 매우 stateful한 작업이므로, LightningDataModule은 두가지 별도의 메서드를 제공한다.
    • setup
      • 모듈의 각 복사본에서 설정해야 하는 모든 상태를 처리한다.
      • 여기서는 데이터를 분할하고 self에 추가한다.
    • prepare_data
      • 각 머신에서만 설정해야 하는 모든 상태를 처리한다.
      • e.g. 스토리지에서 데이터를 다운로드하여, 로컬 디스크에 쓰기
def setup(self, stage=None):  # prepares state that needs to be set for each GPU on each node
    if stage == "fit" or stage is None:  # other stages: "test", "predict"
        self.train_dataset = torch.utils.data.Subset(self.dataset, self.train_indices)
        self.val_dataset = torch.utils.data.Subset(self.dataset, self.val_indices)

def prepare_data(self):  # prepares state that needs to be set once per node
    pass  # but we don't have any "node-level" computations

CorrelatedDataModule.setup, CorrelatedDataModule.prepare_data = setup, prepare_data
  • 그 후 Trainer가 요청할 때 DataLoader를 반환하는 메서드를 정의한다.
  • LightningDataModule을 사용하는 test loop를 실행하려면 test_dataloader도 정의해야 한다.
def train_dataloader(self: pl.LightningDataModule) -> torch.utils.data.DataLoader:
    return torch.utils.data.DataLoader(self.train_dataset, batch_size=32)

def val_dataloader(self: pl.LightningDataModule) -> torch.utils.data.DataLoader:
    return torch.utils.data.DataLoader(self.val_dataset, batch_size=32)

CorrelatedDataModule.train_dataloader, CorrelatedDataModule.val_dataloader = train_dataloader, val_dataloader
model = LinearRegression()
datamodule = CorrelatedDataModule()

dataset = datamodule.dataset

print("loss before training:", torch.mean(torch.square(model(dataset.xs) - dataset.ys)).item())

trainer = pl.Trainer(max_epochs=10, gpus=int(torch.cuda.is_available()))
trainer.fit(model=model, datamodule=datamodule)

print("loss after training:", torch.mean(torch.square(model(dataset.xs) - dataset.ys)).item())
  • "Skipping val loop.” 경고문이 나오는 이유는, LinearRegression 모델에 .validation_step 메서드가 없기 때문이다.
    • 추가하자
  • FSDL 코드베이스에서는 BaseDataModule에서 LightningDataModule의 기본 함수를 정의하고 세부 사항은 서브클래스로 넘긴다.
# BaseDataModule.py

def __init__(self, args: argparse.Namespace = None) -> None:
        super().__init__()
        self.args = vars(args) if args is not None else {}
        self.batch_size = self.args.get("batch_size", BATCH_SIZE)
        self.num_workers = self.args.get("num_workers", DEFAULT_NUM_WORKERS)

        self.on_gpu = isinstance(self.args.get("gpus", None), (str, int))

        # Make sure to set the variables below in subclasses
        self.input_dims: Tuple[int, ...]
        self.output_dims: Tuple[int, ...]
        self.mapping: Collection
        self.data_train: Union[BaseDataset, ConcatDataset]
        self.data_val: Union[BaseDataset, ConcatDataset]
        self.data_test: Union[BaseDataset, ConcatDataset]

    @classmethod
    def data_dirname(cls):
        return metadata.DATA_DIRNAME

    @staticmethod
    def add_to_argparse(parser):
        parser.add_argument(
            "--batch_size",
            type=int,
            default=BATCH_SIZE,
            help=f"Number of examples to operate on per forward step. Default is {BATCH_SIZE}.",
        )
        parser.add_argument(
            "--num_workers",
            type=int,
            default=DEFAULT_NUM_WORKERS,
            help=f"Number of additional processes to load data. Default is {DEFAULT_NUM_WORKERS}.",
        )
        return parser

    def config(self):
        """Return important settings of the dataset, which will be passed to instantiate models."""
        return {"input_dims": self.input_dims, "output_dims": self.output_dims, "mapping": self.mapping}

    def prepare_data(self, *args, **kwargs) -> None:
        """Take the first steps to prepare data for use.

        Use this method to do things that might write to disk or that need to be done only from a single GPU
        in distributed settings (so don't set state `self.x = y`).
        """

    def setup(self, stage: Optional[str] = None) -> None:
        """Perform final setup to prepare data for consumption by DataLoader.

        Here is where we typically split into train, validation, and test. This is done once per GPU in a DDP setting.
        Should assign `torch Dataset` objects to self.data_train, self.data_val, and optionally self.data_test.
        """

    def train_dataloader(self):
        return DataLoader(
            self.data_train,
            shuffle=True,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            pin_memory=self.on_gpu,
        )

    def val_dataloader(self):
        return DataLoader(
            self.data_val,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            pin_memory=self.on_gpu,
        )

    def test_dataloader(self):
        return DataLoader(
            self.data_test,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            pin_memory=self.on_gpu,
        )

pl.Callbacks

  • Lightning의 Callback 클래스는 모든 모델을 실행하는 데 반드시 필요하지는 않지만,
  • training, validation 및 testing에 "있으면 좋은" 기능을 추가하여 사용된다.
  • 'callback'은 특정 트리거에 의해 나중에 호출되도록 설계된 코드 단위이다.
  • 콜백은 매우 유연한 시스템이기 때문에 내부적으로 많고 중요한 Lightning 기능을 구현하는 데 Callback을 사용한다.
    • e.g. training 중 저장을 위한 ModelCheckpoint
pl.callbacks.__all__  # builtin Callbacks from Lightning
['BackboneFinetuning',
 'BaseFinetuning',
 'Callback',
 'DeviceStatsMonitor',
 'EarlyStopping',
 'GPUStatsMonitor',
 'XLAStatsMonitor',
 'GradientAccumulationScheduler',
 'LambdaCallback',
 'LearningRateMonitor',
 'ModelCheckpoint',
 'ModelPruning',
 'ModelSummary',
 'BasePredictionWriter',
 'ProgressBar',
 'ProgressBarBase',
 'QuantizationAwareTraining',
 'RichModelSummary',
 'RichProgressBar',
 'StochasticWeightAveraging',
 'Timer',
 'TQDMProgressBar']
  • 여기서 트리거 또는 "hook"는 training, validation 및 testing loop의 특정 지점이다.
  • 일반적으로 hook의 이름에 hook이 언제 호출되는지 설명되어 있지만, 자세한 내용은 언제든지 문서로 확인할 수 있다.
  • pl.Callback를 상속하고 "hook" 메서드 중 하나를 오버라이드하여 자신만의 콜백을 정의할 수 있다.
  • e.g. 1
    • 이 콜백은 training 에포크가 시작될 때마다, 그리고 validation 에포크가 종료될 때마다 메시지를 출력한다.
class HelloWorldCallback(pl.Callback):

    def on_train_epoch_start(self, trainer: pl.Trainer, pl_module: pl.LightningModule):
        print("👋 hello from the start of the training epoch!")

    def on_validation_epoch_end(self, trainer: pl.Trainer, pl_module: pl.LightningModule):
        print("👋 hello from the end of the validation epoch!")

 

  • e.g. 2
    • "hook"에 따라 직접 사용할 수 있는 정보가 다르다.
    • on_train_batch_starton_train_batch_end 후크 내부의 배치 정보에 직접 액세스할 수 있다.
import random


def on_train_batch_start(self, trainer: pl.Trainer, pl_module: pl.LightningModule, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int):
        if random.random() > 0.995:
            print(f"👋 hello from inside the lucky batch, #{batch_idx}!")


HelloWorldCallback.on_train_batch_start = on_train_batch_start
  • Trainer를 초기화할 때 콜백을 제공하면 모델 피팅 중에 콜백이 호출된다.
model = LinearRegression()

datamodule = CorrelatedDataModule()

trainer = pl.Trainer(  # we instantiate and provide the callback here, but nothing happens yet
    max_epochs=10, gpus=int(torch.cuda.is_available()), callbacks=[HelloWorldCallback()])
trainer.fit(model=model, datamodule=datamodule)

torchmetrics

  • DNN은 까다롭고 조용히 중단되기 때문에 충돌을 일으키기보다는 잘못된 작업을 할 가능성이 있다.
  • 주의 깊게 모니터링하지 않으면 잘못된 문제가 사용자에게 많은 피해를 입힌 후 한참이 지나서야 발견될 수 있다.
  • 우리는 metric을 계산하여 훈련 중에 일어나는 일을 모니터링하고 버그를 포착하거나, 로그를 보고 훈련에서 버그를 수정하는 방법을 결정할 수 있는 '관찰 가능성(observability)'을 달성하고자 한다.
  • 하지만 DNN 학습은 성능에도 민감하다.
  • 대규모 언어 모델에 대한 학습 실행에는 기존 소프트웨어 파이프라인의 구축 작업보다 아파트 단지를 짓는 것과 비슷한 예산이 소요된다.
  • 학습 속도가 조금이라도 느려지면 상당한 비용이 추가되어 버그를 더 빨리 발견하고 수정할 수 있는 이점이 사라질 수 있다.
  • 또한 학습 중에 metric 계산을 구현하면 테스트 작성 및 모니터링과 같은 추가 작업이 더 생기는 것이다.
  • 이는 활용도가 높은 연구 작업에 방해가 된다.
  • pytorch_lightning.metrics로 시작된 torchmetrics 라이브러리는 1) 배치 및 여러 device에 걸친 누적과 같은 모범 사례를 통합하고, 2) 통합 인터페이스를 정의하고, 3) Lightning의 내장 logging과 통합되는 Metric 클래스를 제공하여 이러한 문제를 해결한다.
import torchmetrics

tm_version = torchmetrics.__version__
print("metrics:", *textwrap.wrap(", ".join(torchmetrics.__all__), width=80), sep="\n\t")
metrics:
    functional, Accuracy, AUC, AUROC, AveragePrecision, BinnedAveragePrecision,
    BinnedPrecisionRecallCurve, BinnedRecallAtFixedPrecision, BLEUScore,
    BootStrapper, CalibrationError, CatMetric, CHRFScore, CohenKappa,
    ConfusionMatrix, CosineSimilarity, TweedieDevianceScore, ExplainedVariance,
    ExtendedEditDistance, F1, F1Score, FBeta, FBetaScore, HammingDistance, Hinge,
    HingeLoss, JaccardIndex, KLDivergence, MatthewsCorrcoef, MatthewsCorrCoef,
    MaxMetric, MeanAbsoluteError, MeanAbsolutePercentageError, MeanMetric,
    MeanSquaredError, MeanSquaredLogError, Metric, MetricCollection, MetricTracker,
    MinMaxMetric, MinMetric, MultioutputWrapper,
    MultiScaleStructuralSimilarityIndexMeasure, PearsonCorrcoef, PearsonCorrCoef,
    PermutationInvariantTraining, PIT, Precision, PrecisionRecallCurve, PSNR,
    PeakSignalNoiseRatio, R2Score, Recall, RetrievalFallOut, RetrievalHitRate,
    RetrievalMAP, RetrievalMRR, RetrievalNormalizedDCG, RetrievalPrecision,
    RetrievalRecall, RetrievalRPrecision, ROC, SacreBLEUScore, SDR,
    SignalDistortionRatio, ScaleInvariantSignalDistortionRatio, SI_SDR, SI_SNR,
    ScaleInvariantSignalNoiseRatio, SignalNoiseRatio, SNR, SpearmanCorrcoef,
    SpearmanCorrCoef, Specificity, SQuAD, SSIM, StructuralSimilarityIndexMeasure,
    StatScores, SumMetric, SymmetricMeanAbsolutePercentageError,
    TranslationEditRate, WER, WordErrorRate, CharErrorRate, MatchErrorRate,
    WordInfoLost, WordInfoPreserved
  • LightningModule과 마찬가지로, torchmetrics.Metrictorch.nn.Module을 상속합니다.
  • 그 이유는 metric 계산은 모듈 애플리케이션과 마찬가지로 일반적으로
    1. 배열을 많이 사용하는 계산이며,
    2. persistent 상태(Module의 경우 파라미터, Metric의 경우 실행 값)에 의존하고,
    3. 가속의 이점을 누리며,
    4. device와 node에 분산할 수 있기 때문입니다.
  • 사용 중인 torchmetrics의 버전에 대한 문서는 여기에서 확인할 수 있습니다:
  • BaseLitModel에서는 torchmetrics.Accuracy metric을 사용한다.
# BaseLitModel.py __init__

def __init__(self, model, args: argparse.Namespace = None):
        super().__init__()
        self.model = model
        self.args = vars(args) if args is not None else {}

        self.data_config = self.model.data_config
        self.mapping = self.data_config["mapping"]
        self.input_dims = self.data_config["input_dims"]

        optimizer = self.args.get("optimizer", OPTIMIZER)
        self.optimizer_class = getattr(torch.optim, optimizer)

        self.lr = self.args.get("lr", LR)

        loss = self.args.get("loss", LOSS)
        if loss not in ("transformer",):
            self.loss_fn = getattr(torch.nn.functional, loss)

        self.one_cycle_max_lr = self.args.get("one_cycle_max_lr", None)
        self.one_cycle_total_steps = self.args.get("one_cycle_total_steps", ONE_CYCLE_TOTAL_STEPS)

        self.train_acc = Accuracy()
        self.val_acc = Accuracy()
        self.test_acc = Accuracy()

 

  • LinearRegression Final Code
class LinearRegression(pl.LightningModule):

    def __init__(self):
        super().__init__()  # just like in torch.nn.Module, we need to call the parent class __init__

        # attach torch.nn.Modules as top level attributes during init, just like in a torch.nn.Module
        self.model = torch.nn.Linear(in_features=1, out_features=1)
        # we like to define the entire model as one torch.nn.Module -- typically in a separate class

    # optionally, define a forward method
    def forward(self, xs):
        return self.model(xs)  # we like to just call the model's forward method

		def training_step(self: pl.LightningModule, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int) -> torch.Tensor:
		    xs, ys = batch  # unpack the batch
		    outs = self(xs)  # apply the model
		    loss = torch.nn.functional.mse_loss(outs, ys)  # compute the (squared error) loss
		    return loss

		def configure_optimizers(self: LinearRegression) -> torch.optim.Optimizer:
		    optimizer = torch.optim.Adam(self.parameters(), lr=3e-4)  # https://fsdl.me/ol-reliable-img
		    return optimizer
  • CorrelatedDataModule Final Code
import math


class CorrelatedDataModule(pl.LightningDataModule):

    def __init__(self, size=10_000, train_frac=0.8, batch_size=32):
        super().__init__()  # again, mandatory superclass init, as with torch.nn.Modules

        # set some constants, like the train/val split
        self.size = size
        self.train_frac, self.val_frac = train_frac, 1 - train_frac
        self.train_indices = list(range(math.floor(self.size * train_frac)))
        self.val_indices = list(range(self.train_indices[-1], self.size))

        # under the hood, we've still got a torch Dataset
        self.dataset = CorrelatedDataset(N=size)

		def setup(self, stage=None):  # prepares state that needs to be set for each GPU on each node
		    if stage == "fit" or stage is None:  # other stages: "test", "predict"
		        self.train_dataset = torch.utils.data.Subset(self.dataset, self.train_indices)
		        self.val_dataset = torch.utils.data.Subset(self.dataset, self.val_indices)
		
		def prepare_data(self):  # prepares state that needs to be set once per node
		    pass  # but we don't have any "node-level" computations

		def train_dataloader(self: pl.LightningDataModule) -> torch.utils.data.DataLoader:
		    return torch.utils.data.DataLoader(self.train_dataset, batch_size=32)
		
		def val_dataloader(self: pl.LightningDataModule) -> torch.utils.data.DataLoader:
		    return torch.utils.data.DataLoader(self.val_dataset, batch_size=32)

 

[출처] https://fullstackdeeplearning.com/