`from functools import reduce
from operator import mul
from typing import Tuple
import numpy as np
import torch
import torchvision
import torch.nn as nn
from models.loss_functions.lsaloss import LSALoss
from models.base import BaseModule
from models.blocks_2d import DownsampleBlock
from models.blocks_2d import ResidualBlock
from models.blocks_2d import UpsampleBlock
from models.estimator_1D import Estimator1D
import cv2
class Encoder(BaseModule):
"""
CIFAR10 model encoder.
"""
def init(self, input_shape, code_length):
# type: (Tuple[int, int, int], int) -> None
"""
Class constructor:
:param input_shape: the shape of CIFAR10 samples.
:param code_length: the dimensionality of latent vectors.
"""
super(Encoder, self).__init__()
self.input_shape = input_shape
self.code_length = code_length
c, h, w = input_shape
print (c,h,w)
activation_fn = nn.LeakyReLU()
# Convolutional network
self.conv = nn.Sequential(
nn.Conv2d(in_channels=c, out_channels=32, kernel_size=3, bias=False),
activation_fn,
ResidualBlock(channel_in=32, channel_out=32, activation_fn=activation_fn),
DownsampleBlock(channel_in=32, channel_out=64, activation_fn=activation_fn),
DownsampleBlock(channel_in=64, channel_out=128, activation_fn=activation_fn),
DownsampleBlock(channel_in=128, channel_out=256, activation_fn=activation_fn),
)
self.deepest_shape = (256, h // 8, w // 8)
# FC network
self.fc = nn.Sequential(
nn.Linear(in_features=reduce(mul, self.deepest_shape), out_features=256),
nn.BatchNorm1d(num_features=256),
activation_fn,
nn.Linear(in_features=256, out_features=code_length),
nn.Sigmoid()
)
def forward(self, x):
# types: (torch.Tensor) -> torch.Tensor
"""
Forward propagation.
:param x: the input batch of images.
:return: the batch of latent vectors.
"""
h = x
print (type(h))
h = self.conv(h)
h = h.view(len(h), -1)
o = self.fc(h)
return o
class Decoder(BaseModule):
"""
CIFAR10 model decoder.
"""
def init(self, code_length, deepest_shape, output_shape):
# type: (int, Tuple[int, int, int], Tuple[int, int, int]) -> None
"""
Class constructor.
:param code_length: the dimensionality of latent vectors.
:param deepest_shape: the dimensionality of the encoder's deepest convolutional map.
:param output_shape: the shape of CIFAR10 samples.
"""
super(Decoder, self).__init__()
self.code_length = code_length
self.deepest_shape = deepest_shape
self.output_shape = output_shape
print (self.output_shape,"--")
activation_fn = nn.LeakyReLU()
# FC network
self.fc = nn.Sequential(
nn.Linear(in_features=code_length, out_features=256),
nn.BatchNorm1d(num_features=256),
activation_fn,
nn.Linear(in_features=256, out_features=reduce(mul, deepest_shape)),
nn.BatchNorm1d(num_features=reduce(mul, deepest_shape)),
activation_fn
)
# Convolutional network
self.conv = nn.Sequential(
UpsampleBlock(channel_in=256, channel_out=128, activation_fn=activation_fn),
UpsampleBlock(channel_in=128, channel_out=64, activation_fn=activation_fn),
UpsampleBlock(channel_in=64, channel_out=32, activation_fn=activation_fn),
ResidualBlock(channel_in=32, channel_out=32, activation_fn=activation_fn),
nn.Conv2d(in_channels=32, out_channels=3, kernel_size=1, bias=False)
)
def forward(self, x):
# types: (torch.Tensor) -> torch.Tensor
"""
Forward propagation.
:param x: the batch of latent vectors.
:return: the batch of reconstructions.
"""
h = x
h = self.fc(h)
h = h.view(len(h), *self.deepest_shape)
h = self.conv(h)
o = h
return o
class LSACIFAR10(BaseModule):
"""
LSA model for CIFAR10 one-class classification.
"""
def init(self, input_shape, code_length, cpd_channels):
# type: (Tuple[int, int, int], int, int) -> None
"""
Class constructor.
:param input_shape: the shape of CIFAR10 samples.
:param code_length: the dimensionality of latent vectors.
:param cpd_channels: number of bins in which the multinomial works.
"""
super(LSACIFAR10, self).__init__()
self.input_shape = input_shape
self.code_length = code_length
# Build encoder
self.encoder = Encoder(
input_shape=input_shape,
code_length=code_length
)
# Build decoder
self.decoder = Decoder(
code_length=code_length,
deepest_shape=self.encoder.deepest_shape,
output_shape=input_shape
)
# Build estimator
self.estimator = Estimator1D(
code_length=code_length,
fm_list=[32, 32, 32, 32],
cpd_channels=cpd_channels
)
def forward(self, x):
# type: (torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]
"""
Forward propagation.
:param x: the input batch of images.
:return: a tuple of torch.Tensors holding reconstructions, latent vectors and CPD estimates.
"""
h = x
# Produce representations
z = self.encoder(h)
# Estimate CPDs with autoregression
z_dist = self.estimator(z)
# Reconstruct x
x_r = self.decoder(z)
# print (x_r.shape)
x_r = x_r.view(-1, *self.input_shape)
return x_r, z, z_dist
def load_dataset(data_path="/home/jbmai/Downloads/Defect Images-20190705T133320Z-001"):
# data_path = 'data/train/'
torchvision.transforms.Grayscale(num_output_channels=1)
trainTransform = torchvision.transforms.Compose([
torchvision.transforms.Resize(size=(128,128), interpolation=2),
torchvision.transforms.ToTensor(),
])
train_dataset = torchvision.datasets.ImageFolder(
root=data_path,
transform=trainTransform)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=64,
num_workers=0,
shuffle=True
)
return train_loader
net = LSACIFAR10(input_shape=[3,128,128],code_length = 32,cpd_channels =100)
lossFunction = LSALoss(cpd_channels=100)
optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
try:
checkpoint = torch.load("savedWeights/enc.pth")
net.encoder.load_state_dict(checkpoint)
checkpoint = torch.load("savedWeights/est.pth")
net.estimator.load_state_dict(checkpoint)
checkpoint = torch.load("savedWeights/dec.pth")
net.decoder.load_state_dict(checkpoint)
except Exception as e:
print (e)
for epoch in range(1000): # loop over the dataset multiple times
running_loss = 0.0
d = load_dataset()
for i, (data,l) in enumerate(d):
# get the inputs; data is a list of [inputs, labels]
# print (data.shape)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
# print (data)
x_r,z,z_dist = net.forward(data)
# print (x_r.shape)
# print(data.shape)
loss = lossFunction(data,x_r,z,z_dist)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % 5 == 0: # print every 2000 mini-batches
print('[%d, %5d] loss: %.3f' %
(epoch + 1, i + 1, running_loss / 5))
running_loss = 0.0
if (epoch % 5)== 0 :
# print ("--------------------{} epoch-----------".format(epoch))
# net.encoder.eval()
# net.estimator.eval()
# net.decoder.eval()
# z = net.encoder(data)
# z_dist = net.estimator(z)
# x_r = net.decoder(z).permute(0,2,3,1).detach().numpy()
# out =x_r
# print (type(out))
# for i in range(out.shape[0]):
# # print (out.shape)
# # # out.permute(0,2,3,1)
# # print (out.shape)
# cv2.imwrite("constructedImages/outDec{}_{}.jpg".format(epoch,i),out[i,:,:,:]*255)
# # cv2.waitKey(0)
# # cv2.
# net.encoder.train()
# net.estimator.train()
# net.decoder.eval()
torch.save(net.encoder.state_dict(),("savedWeights/enc.pth"))
torch.save(net.estimator.state_dict(),("savedWeights/est.pth"))
torch.save(net.decoder.state_dict(),("savedWeights/dec.pth"))
print('Finished Training')`
Output :
<class 'torch.Tensor'>
[1, 1] loss: 727109273.600
<class 'torch.Tensor'>
[2, 1] loss: 2495627954514337382531072.000
Hi can you help me rectify the issue.