안녕하세요
이번에 소개드릴 내용은 Pytorch의 TPU 학습을 하는 방법과 함께, 실험 관리 및 아티펙트 버전관리 기능을 제공하는 Weights and Biases (a.k.a wandb)의 로깅 방법에 대해 소개드리려 합니다.
Index
- TPU
- Weights and Biases
- Pytorch + XLA with Google Colaboratory
TPU
TPU는 구글에서 개발된 “Tensor Processing Units”을 의미합니다.
딥러닝 모델을 학습시키는 방법으로는, CPU를 사용한 방법과, 하드웨어 가속을 사용하는 GPU연산을 보편적으로 많이 사용합니다. 또한 거대한 모델을 학습시킬때에는 다수의 GPU를 사용하여 학습을 진행합니다. 그러나 프로토타입, POC, 또는 Kaggle과 같은 경진대회에서는 1장의 GPU를 가지고 있는 인스턴스의 제공이 일반적입니다. 반면에 TPU 인스턴스를 할당 받았을때에는 8-Cores의 TPU를 할당 받을수 있습니다.
TPU를 사용하는 방법은 클라우드 TPU를 사용하는게 일반적이지만, 이번 블로그는 Google Colab의 TPU 인스턴스를 기준으로 소개드리려 합니다.
Weights and Biases
wandb는 머신러닝 실험관리, 데이터 및 아티펙트 버전관리, 그리고 리포팅을 도와주는 플랫폼입니다.
아래 예시는 저희 연구팀에서의 실험관리 대시보드입니다.
다양한 실험관리 플랫폼이 있지만, 저희 연구팀에서 wandb를 사용하는 이유는 저장된 모델의 버전관리와 데이터 버전관리가 용이하여 사용하고 있습니다
Pytorch + XLA with Google Colaboratory
딥러닝 모델의 프로토타입을 개발 할 때, 고비용의 Cloud TPU는 비용적인 측면에서 부담이 될 수 있습니다. 가설 검증을 위한 POC단계에서 빠르게 프로토타이핑을 하고 가능 여부를 확인하는데에는 딥러닝 모델의 학습 시간 또한 중요합니다. 저희 연구팀은 간단한 프로토타이핑은 GCS와 Google Colaboratory (a.k.a colab)을 사용하여 딥러닝 모델의 학습을 진행합니다. 물론, GPU 인스턴스도 함께 사용합니다.
이번에 소개드릴 내용은 “ Google colab의 TPU인스턴스에서 모든 TPU코어를 사용하여 Pytorch 모델의 학습을 진행하면서 wandb에 로깅 ” 을 하는 방법을 공유드리려 합니다.
Colab의 런타임을 TPU로 설정해주셔야 학습까지 진행이 가능합니다.
Tensorflow 프레임워크를 사용할 때에는 별도의 추가적인 작업 없이 TPU학습이 가능합니다. 그러나 Pytorch를 사용하실 때에는 추가적인 사전 작업이 필요합니다.
바로 Pytorch/xla 설치가 필요합니다.
설치가 완료되면 소스코드를 작성합니다.
중요한점은 8-Core를 모두 사용하는 학습은 분산학습 방법중, DDP Strategy를 사용함으로 로직이 실행되는 함수안에 데이터 로더의 생성과, 모델 생성하는 로직이 함께 포함 되어야 합니다.
import torch import torchvision import torch_xla import torch_xla.core.xla_model as xm import torch_xla.distributed.xla_multiprocessing as xmp import torch_xla.distributed.parallel_loader as pl def _run(index, args): …
_run 이라는 이름의 함수를 정의하고, 학습 관련 코드를 추가하도록 하겠습니다.
함수의 아규먼트로 index와 args가 있는데 여기서 index는 TPU코어의 인덱스를 의미합니다. args는 추후 함수를 실행 할 시 주어질 파라미터입니다.
예시를 위해 데이터셋은 Torchvision에서 제공하는 데이터셋을 사용해보겠습니다.
train_dataset = torchvision.datasets.FashionMNIST( … ) test_dataset = torchvision.datasets.FashionMNIST( … )
데이터 다운로드가 완료되면 _run 함수에 pytorch학습을 위한 device설정과 함께 데이터 로더를 정의합니다.
중요한점은 데이터로더를 생성할때, Sampler를 정의하여 주어야 합니다.
이 Sampler는 TPU코어에 데이터를 샘플링하는 역활을 합니다.
def _run(index, args): device = xm.xla_device() train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) train_dataloader = torch.utils.data.DataLoader( dataset = train_dataset, sampler = train_sampler, batch_size = 128, drop_last = True ) …
동일한 방법으로 validation데이터 로더도 함께 만들어 주시면 됩니다.
학습을 위한 모델과 loss object, optimizer를 정의합니다.
mx = torchvision.models.alexnet(num_classes=10) def _run(index, args): device = xm.xla_device() train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) train_dataloader = torch.utils.data.DataLoader( dataset = train_dataset, sampler = train_sampler, batch_size = 128, drop_last = True ) model = mx.to(device) loss_fn = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters()) …
모델 학습을 위한 Train 로직을 정의합니다.
데이터로더를 직접 주는것이 아닌 xla의 데이터 로더를 같이 정의합니다.
Pytorch로 학습할때와 다르게 optimizer의 스텝은 xla를 통하여 진행합니다.
mx = torchvision.models.alexnet(num_classes=10) def _run(index, args): device = xm.xla_device() train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) train_dataloader = torch.utils.data.DataLoader( dataset = train_dataset, sampler = train_sampler, batch_size = 128, drop_last = True ) model = mx.to(device) loss_fn = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters()) for epoch in range(epochs): para_train_loader = pl.ParallelLoader( train_loader, [device] ).per_device_loader(device) for batch in para_train_loader: model.train() data, label = batch data = data.to(device) label = label.to(device) optimizer.zero_grad() pred = model(data) loss = loss_fn(pred, label) loss.backward() xm.optimizer_step(optimizer) …
비슷하게 Validation 스텝을 추가해 줍니다.
mx= torchvision.models.alexnet(num_classes=10) def _run(index, args): device = xm.xla_device() train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) train_dataloader = torch.utils.data.DataLoader( dataset = train_dataset, sampler = train_sampler, batch_size = args[“batch_size”], drop_last = True ) model =mx.to(device) loss_fn = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters()) for epoch in range(epochs): para_train_loader = pl.ParallelLoader( train_loader, [device] ).per_device_loader(device) para_valid_loader = … for batch in para_train_loader: model.train() data, label = batch data = data.to(device) label = label.to(device) optimizer.zero_grad() pred = model(data) loss = loss_fn(pred, label) loss.backward() xm.optimizer_step(optimizer) for val_batch in para_valid_loader: model.eval() with torch.no_grad(): val_pred = … val_loss = … … …
이렇게 학습을 위한 _run 함수를 정의가 되었습니다.
pytorch/xla를 통해 학습을 하기 위해서는 spawn을 통해 학습을 진행함으로 코드를 py 파일화와 함께 main구문을 작성해 줍니다.
colab에서는 cell의 내용을 py파일로 작성하기위해 magic command를 사용해서 쉽게 작성할수 있습니다
%%writefile pytorch_tpu_train.py import … def _run(index, args): … if __name __ == “__main__”: FLAGS = {“batch_size” : 128} xmp.spawn(_run, args =(FLAGS, ), nprocs=8, start_method = ‘fork’)
이렇게 main 구문으로 학습을 실행하는 코드를 정의합니다.
pytorch/xla를 사용하여 TPU의 8 core를 사용하는 로직이 완성되었습니다.
이제 wandb를 통한 실험관리를 위해 정의된 _run함수에 wandb와 관련된 로깅 코드를 추가해 보도록 하겠습니다.
import torch import torchvision import torch_xla import torch_xla.core.xla_model as xm import torch_xla.distributed.xla_multiprocessing as xmp import torch_xla.distributed.parallel_loader as pl train_transform = torchvision.transforms.Compose( [ … ] ) valid_transform = torchvision.transforms.Compose( [ … ] ) train_dataset = torchvision.datasets.FashionMNIST( … ) test_dataset = torchvision.datasets.FashionMNIST( … ) mx = torchvision.models.alexnet(num_classes=10) def _run(index, args): device = xm.xla_device() train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) train_dataloader = torch.utils.data.DataLoader( dataset = train_dataset, sampler = train_sampler, batch_size = args[“batch_size”], drop_last = True ) model = mx.to(device) loss_fn = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters()) for epoch in range(epochs): para_train_loader = pl.ParallelLoader( train_loader, [device] ).per_device_loader(device) para_valid_loader = … for batch in para_train_loader: model.train() data, label = batch data = data.to(device) label = label.to(device) optimizer.zero_grad() pred = model(data) loss = loss_fn(pred, label) loss.backward() xm.optimizer_step(optimizer) for val_batch in para_valid_loader: model.eval() with torch.no_grad(): val_pred = … val_loss = … … if __name __ == “__main__”: FLAGS = {“batch_size” : 128} xmp.spawn(_run, args =(FLAGS, ), nprocs=8, start_method = ‘fork’)
Wandb Logging을 추가해 보도록 하겠습니다.
import torch import torchvision import torch_xla import torch_xla.core.xla_model as xm import torch_xla.distributed.xla_multiprocessing as xmp import torch_xla.distributed.parallel_loader as pl import torch_xla.utils.serialization as xser import wandb train_transform = torchvision.transforms.Compose( [ … ] ) valid_transform = torchvision.transforms.Compose( [ … ] ) train_dataset = torchvision.datasets.FashionMNIST( … ) test_dataset = torchvision.datasets.FashionMNIST( … ) mx = torchvision.models.alexnet(num_classes=10) def _run(index, args): wandb.login(key=args["wandb_key"]) run = wandb.init( project = "pytorch-xla-fashionmnist", group = "xla-colab-tpu") device = xm.xla_device() train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) train_dataloader = torch.utils.data.DataLoader( dataset = train_dataset, sampler = train_sampler, batch_size = args["batch_size"], drop_last = True ) model = mx.to(device) loss_fn = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters()) for epoch in range(epochs): para_train_loader = pl.ParallelLoader( train_loader, [device] ).per_device_loader(device) para_valid_loader = … train_cum_loss = 0 valid_cum_los s= 0 for n, batch in enumerate(para_train_loader): model.train() data, label = batch data = data.to(device) label = label.to(device) optimizer.zero_grad() pred = model(data) loss = loss_fn(pred, label) loss.backward() xm.optimizer_step(optimizer) train_cum_loss += loss.item() if n % 100 == 0: run.log( {"epoch": epoch, "train_loss": loss} for val_batch in para_valid_loader: model.eval() with torch.no_grad(): val_pred = … val_loss = … valid_cum_loss += val_loss … run.log( {"epoch": epoch, "valid_loss": valid_cum_loss / len(valid_dataloader) } xser.save(model.state_dict(), f"{index}_model.pt") run.log_artifact(f"{index}_model.pt", name = f"{epoch}_model", type="model") if __name __ == "__main__": FLAGS = {"batch_size" : 128, "wandb_key": <PERSONAL-KEY>} xmp.spawn(_run, args =(FLAGS, ), nprocs=8, start_method = ‘fork’) wandb.finish()
로깅을 할 수치에 대해서 wandb.log를 이용하여 로깅을 진행합니다.
TPU의 8 core를 사용하기때문에 wandb또한 8개의 run이 실행됩니다. 여러개의 wandb run을 관리하기 위해 동일한 group으로 묶어서 표시하도록 wandb.init에 group을 지정해줍니다
Full Source Code
Pytorch / XLA와 wandb를 설치합니다.
!curl https://raw.githubusercontent.com/pytorch/xla/master/contrib/scripts/env-setup.py -o pytorch-xla-env-setup.py !python pytorch-xla-env-setup.py --apt-packages libomp5 libopenblas-dev !pip install -qqq wandb
학습 로직이 정의된 파이썬 스크립트를 생성합니다.
%%writefile pytorch_tpu_train.py import torch import torchvision import torch_xla import torch_xla.core.xla_model as xm import torch_xla.distributed.xla_multiprocessing as xmp import torch_xla.distributed.parallel_loader as pl import torchmetrics import wandb train_transform = torchvision.transforms.Compose([ torchvision.transforms.Resize((128, 128)), torchvision.transforms.Lambda(lambda x: x.convert('RGB')), torchvision.transforms.ToTensor(), torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) valid_transform = torchvision.transforms.Compose([ torchvision.transforms.Resize((128, 128)), torchvision.transforms.Lambda(lambda x: x.convert('RGB')), torchvision.transforms.ToTensor(), torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) train_dataset = torchvision.datasets.FashionMNIST('.', train=True, download=True, transform=train_transform) valid_dataset = torchvision.datasets.FashionMNIST('.', train=False, download=True, transform=valid_transform) mx = torchvision.models.resnet50(num_classes=10) def _run(index, args): wandb.login(key=args['wandb_key']) run = wandb.init(project='pytorch-xla-fashionmnist', group='xla-colab-tpu') device = xm.xla_device() train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) train_dataloader = torch.utils.data.DataLoader( dataset = train_dataset, sampler = train_sampler, batch_size = 128, drop_last = True ) valid_sampler = torch.utils.data.distributed.DistributedSampler( valid_dataset, num_replicas = xm.xrt_world_size(), rank = xm.get_ordinal(), shuffle = True ) valid_dataloader = torch.utils.data.DataLoader( dataset = valid_dataset, sampler = valid_sampler, batch_size = 128, drop_last = True ) model = mx.to(device) learning_rate = 0.001 * xm.xrt_world_size() loss_fn = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) for epoch in range(100): para_train_loader = pl.ParallelLoader(train_dataloader, [device]).per_device_loader(device) para_valid_loader = pl.ParallelLoader(valid_dataloader, [device]).per_device_loader(device) train_cum_loss = 0 valid_cum_loss = 0 train_cum_acc = 0. valid_cum_acc = 0. for n, batch in enumerate(para_train_loader): model.train() data, label = batch data = data.to(device) label = label.to(device) optimizer.zero_grad() pred = model(data) loss = loss_fn(pred, label) loss.backward() xm.optimizer_step(optimizer) train_cum_loss += loss.item() pred_argmax = torch.argmax(pred, dim=-1) batch_acc = torch.sum(pred_argmax == label).float() / float(label.size(0)) train_cum_acc += batch_acc if n % 10 == 0: print(f"XLA: [{index}]: Train Loss: {loss.item()} Train Accuracy: {batch_acc.cpu()}") run.log({'epoch': epoch, 'train_loss': loss.item(), 'train_accuracy': batch_acc.cpu()}) xm.master_print(f"{epoch} |Training Done") for n, val_batch in enumerate(para_valid_loader): model.eval() val_data, val_label = val_batch val_data = val_data.to(device) val_label = val_label.to(device) with torch.no_grad(): val_pred = model(val_data) val_loss = loss_fn(val_pred, val_label) valid_cum_loss += val_loss.item() val_pred_argmax = torch.argmax(val_pred, dim=-1) val_batch_acc = torch.sum(val_pred_argmax == val_label).float() / float(val_label.size(0)) valid_cum_acc += val_batch_acc run.log({"epoch":epoch, "valid_loss": valid_cum_loss / len(valid_dataloader), 'valid_accuracy': valid_cum_acc / n }) print(f"XLA: [{index}]: Valid Loss: {val_loss.item()} Valid Accuracy: {valid_cum_acc.cpu()}") xm.master_print(f"{epoch} |Validation Done") if __name__ == "__main__": FLAGS = {'wandb_key': "<YOUR-WANDB-KEY>"} xmp.spawn(_run, args=(FLAGS, ), nprocs=8, start_method='fork') wandb.finish()
학습 시작을 위해 파이썬 코드를 실행합니다
!python pytorch_tpu_train.py
그룹으로 묶여있는 wandb런을 보여줍니다.
왼쪽의 보라색으로 표시된 group보기를 제거 하면 각 wandb run에 대한 그래프를 확인 할 수 있습니다.
학습이 진행됨에 따라 colab의 표시 화면 입니다.
XLA: index는 각 core에 할당된 모델의 학습 과정입니다.
소스코드에서의 xm.master_print를 통해 각 core에서 출력되는것을 방지 할수 있습니다.
Conclude
TPU를 통한 학습은 GPU를 사용할때보다 빠른 속도로 학습을 가능하도록 도와줍니다. 실험 관리를 위한 wandb를 사용함으로 학습 과정과 데이터, 모델을 아티펙트화 하고 저장하면서 이전 MLOPs 시리즈에서 소개드렸던 아티팩트 버저닝을 손쉽게 할수 있는 장점이 있습니다. Google Colab 을 사용하면서 GPU와 TPU를 함께 사용하면 다양한 실험을 시도해 볼 수 있습니다. 추가적으로 현재 Colab은 TPU-V2를 제공하고 있으며, Kaggle 노트북은 TPU-V3를 제공합니다.
오늘 소개드린 내용과 관련된 추가자료는 아래 링크에서 확인하실수 있습니다.