diff --git a/enhancer/data/dataset.py b/enhancer/data/dataset.py new file mode 100644 index 0000000..4bc5b3b --- /dev/null +++ b/enhancer/data/dataset.py @@ -0,0 +1,95 @@ + +import glob +import math +import os +import pytorch_lightning as pl +from torch.utils.data import IterableDataset, DataLoader +import torch.nn.functional as F +from typing import Optional + +from enhancer.utils.random import create_unique_rng +from enhancer.utils.io import Audio +from enhancer.utils import Fileprocessor +from enhancer.utils.config import Files + + + +class EnhancerDataset(IterableDataset): + """Dataset object for creating clean-noisy speech enhancement datasets""" + + def __init__(self,name:str,clean_dir,noisy_dir,duration=1.0,sampling_rate=48000, matching_function=None): + + if not os.path.isdir(clean_dir): + raise ValueError(f"{clean_dir} is not a valid directory") + + if not os.path.isdir(noisy_dir): + raise ValueError(f"{clean_dir} is not a valid directory") + + self.sampling_rate = sampling_rate + self.clean_dir = clean_dir + self.noisy_dir = noisy_dir + self.duration = max(1.0,duration) + self.audio = Audio(self.sampling_rate,mono=True,return_tensor=True) + + fp = Fileprocessor.from_name(name,clean_dir,noisy_dir,matching_function) + self.valid_files = fp.prepare_matching_dict() + + def __iter__(self): + + rng = create_unique_rng(12) ##pass epoch number here + + while True: + + file_dict,*_ = rng.choices(self.valid_files,k=1, + weights=[self.valid_files[file]['duration'] for file in self.valid_files]) + file_duration = file_dict['duration'] + start_time = round(rng.uniform(0,file_duration- self.duration),2) + data = self.prepare_segment(file_dict,start_time) + yield data + + def prepare_segment(self,file_dict:dict, start_time:float): + + clean_segment = self.audio(file_dict.keys()[0], + offset=start_time,duration=self.duration) + noisy_segment = self.audio(file_dict['noisy'], + offset=start_time,duration=self.duration) + clean_segment = F.pad(clean_segment,(0,int(self.duration*self.sampling_rate-clean_segment.shape[-1]))) + noisy_segment = F.pad(noisy_segment,(0,int(self.duration*self.sampling_rate-noisy_segment.shape[-1]))) + return {"clean": clean_segment,"noisy":noisy_segment} + + def __len__(self): + + return math.ceil(sum([file["duration"] for file in self.valid_files])/self.duration) + + + +class Dataset(pl.LightningDataModule): + + def __init__(self,name:str, files:Files, + duration:float=1.0, sampling_rate:int=48000, batch_size=32): + super().__init__() + + self.train_clean = files.train_clean + self.train_noisy = files.train_noisy + self.valid_clean = files.test_clean + self.valid_noisy = files.test_noisy + self.name = name + self.duration = duration + self.sampling_rate = sampling_rate + self.batch_size = batch_size + + def setup(self, stage: Optional[str] = None): + + if stage in (None,"fit"): + self.train_dataset = EnhancerDataset(self.name, self.train_clean, + self.train_noisy, self.duration, self.sampling_rate) + + self.valid_dataset = EnhancerDataset(self.name, self.valid_clean, + self.valid_noisy, self.duration, self.sampling_rate) + + def train_loader(self): + return DataLoader(self.train_dataset, batch_size = self.batch_size) + + + def valid_loader(self): + return DataLoader(self.valid_dataset, batch_size = self.batch_size) \ No newline at end of file