from django.conf import settings from django.contrib.auth.models import User from django.db import models from django.utils.translation import get_language from django.utils.translation import ugettext_lazy as _ from shutil import copyfile import os import uuid from fsutils import md5sum class Profile(models.Model): """example Documentation: https://simpleisbetterthancomplex.com/tutorial/2016/07/22/how-to-extend-django-user-model.html#onetoone """ class Meta: ordering = ['user__last_name', 'user__first_name'] accountname = models.CharField(max_length=150, null=True, blank=True) user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile') labs = models.ManyToManyField('Lab', related_name='members') def __str__(self): return '{0}, {1} <{2}>'.format(self.user.last_name, self.user.first_name, self.user.email) @property def fullname(self): return self.user.get_full_name() class Institution(models.Model): class Meta: unique_together = (('abbr', 'name')) ordering = ['name'] abbr = models.CharField(max_length=64, null=True, blank=True, unique=True, verbose_name=_('Abbreviation')) name = models.CharField(max_length=256, unique=True, verbose_name=_('Name')) def __str__(self): return self.name def get_identifier(self): if self.abbr: return self.abbr else: return self.name class Lab(models.Model): class Meta: ordering = ['name'] name = models.CharField(max_length=64, verbose_name=_('Name')) ldap = models.CharField(max_length=32, null=True, blank=True, verbose_name='LDAP') pi = models.OneToOneField( Profile, on_delete=models.SET_NULL, null=True, blank=True, limit_choices_to={'user__groups__name': settings.AUTH_LDAP_PI_GROUP}, related_name='pi_of') institution = models.ForeignKey(Institution, on_delete=models.PROTECT, related_name='labs') def __str__(self): return self.name # Base File Objects ######## class FSFile(models.Model): """ Base Model of a filesystem object Child classes must override path attribute to add missing values """ # Allow passing an NFS path at init? # def __init__(self, src): # self.upload_from_nfs(src) # def get_hash() path = models.FilePathField(path=settings.DATA_ROOT, max_length=256, allow_files=True, allow_folders=False) def upload_from_nfs(self, src): """ Copy to db storage partition from a local network path Return new path after copy """ # TODO: Code in this function will be moved to the Queuing system tmp_dst = settings.DATA_ROOT + "/" + str(uuid.uuid4()) copyfile(src, tmp_dst) # copy + hash at the same time? md5 = md5sum(tmp_dst).hexdigest() path = "{}/{}___{}".format(settings.DATA_ROOT, os.path.basename(src), md5) os.rename(tmp_dst, path) self.path = path self.save() # NGS Domain Objects ######## class FastqFSFile(FSFile): sequencer_company = models.CharField(max_length=128, verbose_name=_('Sequencer Company'), blank=True, null=True) class Dataset(models.Model): name = models.CharField(max_length=128, verbose_name=_('Name')) files = models.ManyToManyField(FSFile, related_name='datasets', verbose_name=_('Project Datasets')) class Project(models.Model): name = models.CharField(max_length=128, verbose_name=_('Name')) source_lab = models.ForeignKey(Lab, on_delete=models.PROTECT, related_name='lab_projects') access_granted_to = models.ManyToManyField(Profile, related_name='projects', verbose_name=_('Grant Access to')) open_to_labs = models.ManyToManyField(Lab, related_name='projects', verbose_name=_('Open to these labs')) datasets = models.ManyToManyField(Dataset, related_name='projects', verbose_name=_('Project Datasets'), blank=True) class Alert(models.Model): message_fr = models.TextField(null=True) message_en = models.TextField(null=True) start = models.DateTimeField() end = models.DateTimeField() @property def localized(self): if get_language() == 'en': return self.message_en else: return self.message_fr class AppSettings(models.Model): home_institution = models.ForeignKey(Institution, null=True, blank=True, on_delete=models.SET_NULL) # Base Object for all Processable types ####### # BETA # class Node(models.Model): # projects = models.ManyToManyField(Project) # profiles = models.ManyToManyField(Profile) # @property # def processes(self): # return (self.input2process, self.output2process) # @property # def _get_node_type(self): # return self.__class__.__name__ # def is_input(self): # return len(self.input2process) != 0 # def is_output(self): # return len(self.output2process) != 0 # def __repr__(self): # return '<%s(%s)>' % (self._get_node_type, self.id) # Chemical Domain objects ######## # BETA # class Substance(models.Model): # umid = models.CharField(max_length=16, null=False, unique=True) # smiles = models.CharField(max_length=512, null=False) # def __repr__(self): # return '<Substance : %s %s>' % (self.umid, self.smiles) # @classmethod # def _search(cls, sql, session): # res = session.execute(sql) # rows = res.fetchall() # # fields = res.keys() # lres = [] # for r in rows: # sub = session.query(Substance).filter(Substance.umid == r[1]).all() # lres.append(sub) # lres = sum(lres, []) # return lres # @classmethod # def exactSearch(cls, smiles, session): # # rdkit cartridge table is call mols #### # # not mapped through sql alchemy #### # sql = "select * from mols where m@='%s' ;" % smiles # return cls._search(sql, session) # @classmethod # def substrSearch(cls, smiles, session, dummyAtom=False): # # rdkit cartridge table is call mols #### # # not mapped through sql alchemy #### # # dummy : #### # sql = "select * from mols where m@>" # if dummyAtom: # sql += "mol_adjust_query_properties('%s');" % smiles # else: # sql += "'%s' ;" % smiles # return cls._search(sql, session) # @classmethod # def similaritySearch(cls, smiles, session): # # similarity search using fingerprint # sql = "select * from get_mfp2_neighbors('%s')" % smiles # rows = session.execute(sql).fetchall() # lres = cls._search(sql, session) # lres = dict([(r.umid, r) for r in lres]) # res = [] # for r in rows: # r = list(r) # r.append(lres[r[1]]) # res.append(r) # return res # class Compound(Node): # batch = models.CharField(max_length=8, null=False) # full_formula = models.CharField(max_length=256, null=False) # full_mass = models.DecimalField(max_digits=5, decimal_places=2, null=False) # parent_formula = models.CharField(max_length=128, null=False) # parent_mass = models.DecimalField(max_digits=5, decimal_places=2, null=False) # status = models.DecimalField(max_digits=2) # deleted_status = models.DecimalField(max_digits=2) # purpose = models.CharField(max_length=32) # library = models.CharField(max_length=32) # notebook = models.CharField(max_length=32) # originator = models.CharField(max_length=32) # supplier = models.CharField(max_length=128) # supplier_batch = models.CharField(max_length=64) # study = models.CharField(max_length=64) # name = models.CharField(max_length=128) # objtype = models.CharField(max_length=16) # purchase_no = models.CharField(max_length=64) # registered_by = models.CharField(max_length=16) # registered_on = models.CharField(max_length=16) # salt = models.CharField(max_length=16) # example : HCL # salt_equivs = models.DecimalField(max_digits=5, decimal_places=2) # 2.0 # solvate = models.CharField(max_length=16) # solvate_equivs = models.DecimalField(max_digits=5, decimal_places=2) # solvent_name = models.CharField(max_length=16) # liquid_amount = models.CharField(max_length=64) # dry_amount = models.CharField(max_length=256) # appearance = models.CharField(max_length=256) # purity = models.CharField(max_length=256) # comment = models.CharField(max_length=256) # substance = models.ForeignKey('Substance', on_delete=models.CASCADE, related_name='compounds') # def __repr__(self): # return "<Compound : %s %s_%s>" % (self.full_formula, self.substance.umid, self.batch) # @property # def _type(self): # return self.__class__.__name__ # @property # def _id(self): # return '%s_%s' % (self.substance.umid, self.batch) # @classmethod # def searchByUM_b(cls, umid, session): # umid, batch = umid.split('_') # c = session.query(Compound).filter(Compound.substance_id == Substance.id) # c = c.filter(Substance.umid == umid) # pre = c.all() # res = c.filter(Compound.batch == batch).all() # if len(res) == 0 and len(pre) > 0: # print('Found %s compounds with %s id but none matching batch %s : ' % (len(pre), umid, batch)) # print(', '.join([e._id for e in pre])) # return res # @classmethod # def searchByUM(cls, umid, session): # res = session.query(Compound).filter(Compound.substance_id == Substance.id) # res = res.filter(Substance.umid == umid).all() # return res # def getResults(self): # res = [] # for i in range(len(self.processes[0])): # p = self.processes[0][i].process # if len(p.inputs) == 1 and p.inputs[0] == self: # res += p.outputs # return res