Source code for datasets.seq_cub200

import os
from typing import Tuple

import numpy as np
import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data.dataset import Dataset


from backbone.ResNet50 import resnet50
from datasets.transforms.denormalization import DeNormalize
from datasets.utils.continual_dataset import (ContinualDataset,
                                              store_masked_loaders)
from utils import smart_joint
from utils.conf import base_path


[docs] class MyCUB200(Dataset): """ Overrides dataset to change the getitem function. """ IMG_SIZE = 224 N_CLASSES = 200 MEAN, STD = (0.4856, 0.4994, 0.4324), (0.2272, 0.2226, 0.2613) TEST_TRANSFORM = transforms.Compose([transforms.Resize(IMG_SIZE), transforms.ToTensor(), transforms.Normalize(MEAN, STD)]) def __init__(self, root, train=True, transform=None, target_transform=None, download=True) -> None: self.not_aug_transform = transforms.Compose([transforms.ToTensor()]) self.root = root self.train = train self.transform = transform self.target_transform = target_transform self.download = download if download: if os.path.isdir(root) and len(os.listdir(root)) > 0: print('Download not needed, files already on disk.') else: from onedrivedownloader import download ln = '<iframe src="https://onedrive.live.com/embed?cid=D3924A2D106E0039&resid=D3924A2D106E0039%21110&authkey=AIEfi5nlRyY1yaE" width="98" height="120" frameborder="0" scrolling="no"></iframe>' print('Downloading dataset') download(ln, filename=smart_joint(root, 'cub_200_2011.zip'), unzip=True, unzip_path=root, clean=True) data_file = np.load(smart_joint(root, 'train_data.npz' if train else 'test_data.npz'), allow_pickle=True) self.data = data_file['data'] self.targets = torch.from_numpy(data_file['targets']).long() self.classes = data_file['classes'] self.segs = data_file['segs'] self._return_segmask = False def __getitem__(self, index: int) -> Tuple[type(Image), int, type(Image)]: """ Gets the requested element from the dataset. Args: index: index of the element to be returned Returns: tuple: (image, target) where target is index of the target class. """ img, target = self.data[index], self.targets[index] # to return a PIL Image img = Image.fromarray(img, mode='RGB') original_img = img.copy() not_aug_img = self.not_aug_transform(original_img) if self.transform is not None: img = self.transform(img) if self.target_transform is not None: target = self.target_transform(target) ret_tuple = [img, target, not_aug_img, self.logits[index]] if hasattr(self, 'logits') else [ img, target, not_aug_img] if self._return_segmask: raise "Unsupported segmentation output in training set!" return ret_tuple def __len__(self) -> int: return len(self.data)
[docs] class CUB200(MyCUB200): """Base CUB200 dataset.""" def __init__(self, root, train=True, transform=None, target_transform=None, download=False) -> None: super().__init__(root, train=train, transform=transform, target_transform=target_transform, download=download) def __getitem__(self, index: int, ret_segmask=False) -> Tuple[type(Image), int, type(Image)]: """ Gets the requested element from the dataset. Args: index: index of the element to be returned Returns: tuple: (image, target) where target is index of the target class. """ img, target = self.data[index], self.targets[index] # to return a PIL Image img = Image.fromarray(img, mode='RGB') if self.transform is not None: img = self.transform(img) if self.target_transform is not None: target = self.target_transform(target) ret_tuple = [img, target, self.logits[index]] if hasattr(self, 'logits') else [img, target] if ret_segmask or self._return_segmask: seg = self.segs[index] seg = Image.fromarray(seg, mode='L') seg = transforms.ToTensor()(transforms.CenterCrop((MyCUB200.IMG_SIZE, MyCUB200.IMG_SIZE))(seg))[0] ret_tuple.append((seg > 0).int()) return ret_tuple
[docs] class SequentialCUB200(ContinualDataset): """Sequential CUB200 Dataset. Args: NAME (str): name of the dataset. SETTING (str): setting of the dataset. N_CLASSES_PER_TASK (int): number of classes per task. N_TASKS (int): number of tasks. SIZE (tuple): size of the images. MEAN (tuple): mean of the dataset. STD (tuple): standard deviation of the dataset. TRANSFORM (torchvision.transforms): transformation to apply to the data. TEST_TRANSFORM (torchvision.transforms): transformation to apply to the test data. """ NAME = 'seq-cub200' SETTING = 'class-il' N_CLASSES_PER_TASK = 20 N_TASKS = 10 SIZE = (MyCUB200.IMG_SIZE, MyCUB200.IMG_SIZE) MEAN, STD = (0.4856, 0.4994, 0.4324), (0.2272, 0.2226, 0.2613) TRANSFORM = transforms.Compose([ transforms.Resize(MyCUB200.IMG_SIZE), transforms.RandomCrop(MyCUB200.IMG_SIZE, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize(MEAN, STD)]) TEST_TRANSFORM = MyCUB200.TEST_TRANSFORM
[docs] def get_data_loaders(self) -> Tuple[torch.utils.data.DataLoader, torch.utils.data.DataLoader]: transform = self.TRANSFORM test_transform = transforms.Compose( [transforms.Resize((MyCUB200.IMG_SIZE, MyCUB200.IMG_SIZE)), transforms.ToTensor(), self.get_normalization_transform()]) train_dataset = MyCUB200(base_path() + 'CUB200', train=True, download=True, transform=transform) test_dataset = CUB200(base_path() + 'CUB200', train=False, download=True, transform=test_transform) train, test = store_masked_loaders( train_dataset, test_dataset, self) return train, test
[docs] @staticmethod def get_transform(): transform = transforms.Compose( [transforms.ToPILImage(), SequentialCUB200.TRANSFORM]) return transform
[docs] @staticmethod def get_backbone(hookme=False): num_classes = SequentialCUB200.N_CLASSES_PER_TASK * SequentialCUB200.N_TASKS return resnet50(num_classes, pretrained=True)
[docs] @staticmethod def get_loss(): return F.cross_entropy
[docs] @staticmethod def get_normalization_transform(): transform = transforms.Normalize( SequentialCUB200.MEAN, SequentialCUB200.STD) return transform
[docs] @staticmethod def get_denormalization_transform(): transform = DeNormalize(SequentialCUB200.MEAN, SequentialCUB200.STD) return transform
[docs] @staticmethod def get_batch_size(): return 16
[docs] @staticmethod def get_epochs(): return 30