Source code for AKScheduling.models

# This file mainly contains signal receivers, which follow a very strong interface, having e.g., a sender attribute
# that is hardly used by us. Nevertheless, to follow the django receiver coding style and since changes might
# cause issues when loading fixtures or model dumps, it is not wise to replace that attribute with "_".
# Therefore, the check that finds unused arguments is disabled for this whole file:
# pylint: disable=unused-argument

from django.db.models.signals import post_save, m2m_changed, pre_delete
from django.dispatch import receiver
from django.utils.translation import gettext_lazy as _

from AKModel.availability.models import Availability
from AKModel.models import AK, AKSlot, Room, Event, ConstraintViolation


[docs] def update_constraint_violations(new_violations, existing_violations_to_check): """ Update existing constraint violations (subset for which new violations were computed) based on these new violations. This will add all new violations without a match, preserve the matching ones and delete the obsolete ones (those without a match from the newly calculated violations). :param new_violations: list of new (not yet saved) violations that exist after the last change :type new_violations: list[ConstraintViolation] :param existing_violations_to_check: list of related violations currently in the db :type existing_violations_to_check: list[ConstraintViolation] """ for new_violation in new_violations: found_match = False for existing_violation in existing_violations_to_check: if existing_violation.matches(new_violation): # Remove from existing violations set since it should stay in db existing_violations_to_check.remove(existing_violation) found_match = True break # Only save new violation if no match was found if not found_match: new_violation.save() # Cleanup obsolete violations (ones without matches computed under current conditions) for outdated_violation in existing_violations_to_check: outdated_violation.delete()
[docs] def update_cv_reso_deadline_for_slot(slot): """ Update constraint violation AK_AFTER_RESODEADLINE for given slot :param slot: slot to check/update :type slot: AKSlot """ event = slot.event # Update only if reso_deadline exists # if event was changed and reso_deadline is removed, CVs will be deleted by event changed handler # Update only has to be done for already scheduled slots with reso intention if slot.ak.reso and slot.event.reso_deadline and slot.start: violation_type = ConstraintViolation.ViolationType.AK_AFTER_RESODEADLINE new_violations = [] # Violation? if slot.end > event.reso_deadline: c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, ) c.aks_tmp.add(slot.ak) c.ak_slots_tmp.add(slot) new_violations.append(c) update_constraint_violations(new_violations, list(slot.constraintviolation_set.filter(type=violation_type)))
[docs] def check_capacity_for_slot(slot: AKSlot): """ Check whether this slot violates the capacity requirement :param slot: slot to check :type slot: AKSlot :return: Violation (if any) or None :rtype: ConstraintViolation or None """ # If slot is scheduled in a room and interest was specified if slot.room and slot.room.capacity >= 0 and slot.ak.interest >= 0: # Create a violation if interest exceeds room capacity if slot.room.capacity < slot.ak.interest: c = ConstraintViolation( type=ConstraintViolation.ViolationType.ROOM_CAPACITY_EXCEEDED, level=ConstraintViolation.ViolationLevel.VIOLATION, event=slot.event, room=slot.room, comment=_("Not enough space for AK interest (Interest: %(interest)d, Capacity: %(capacity)d)") % {'interest': slot.ak.interest, 'capacity': slot.room.capacity}, ) c.ak_slots_tmp.add(slot) c.aks_tmp.add(slot.ak) return c # Create a warning if interest is close to room capacity if slot.room.capacity < slot.ak.interest + 5 or slot.room.capacity < slot.ak.interest * 1.25: c = ConstraintViolation( type=ConstraintViolation.ViolationType.ROOM_CAPACITY_EXCEEDED, level=ConstraintViolation.ViolationLevel.WARNING, event=slot.event, room=slot.room, comment=_("Space is too close to AK interest (Interest: %(interest)d, Capacity: %(capacity)d)") % {'interest': slot.ak.interest, 'capacity': slot.room.capacity} ) c.ak_slots_tmp.add(slot) c.aks_tmp.add(slot.ak) return c return None
[docs] @receiver(post_save, sender=AK) def ak_changed_handler(sender, instance: AK, **kwargs): """ Signal receiver: Check for violations after AK changed Changes might affect: Reso intention, Category, Interest """ # TODO Reso intention changes # Check room capacities violation_type = ConstraintViolation.ViolationType.ROOM_CAPACITY_EXCEEDED new_violations = [] for slot in instance.akslot_set.all(): cv = check_capacity_for_slot(slot) if cv is not None: new_violations.append(cv) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(m2m_changed, sender=AK.owners.through) def ak_owners_changed_handler(sender, instance: AK, action: str, **kwargs): """ Signal receiver: Owners of AK changed """ # Only signal after change (post_add, post_delete, post_clear) are relevant if not action.startswith("post"): return event = instance.event # Owner(s) changed: Might affect multiple AKs by the same owner(s) at the same time violation_type = ConstraintViolation.ViolationType.OWNER_TWO_SLOTS new_violations = [] slots_of_this_ak: [AKSlot] = instance.akslot_set.filter(start__isnull=False) # For all owners (after recent change)... for owner in instance.owners.all(): # ...find other slots that might be overlapping... for ak in owner.ak_set.all(): # ...find overlapping slots... if ak != instance: for slot in slots_of_this_ak: for other_slot in ak.akslot_set.filter(start__isnull=False): if slot.overlaps(other_slot): # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, ak_owner=owner ) c.aks_tmp.add(instance) c.aks_tmp.add(other_slot.ak) c.ak_slots_tmp.add(slot) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(m2m_changed, sender=AK.conflicts.through) def ak_conflicts_changed_handler(sender, instance: AK, action: str, **kwargs): """ Signal receiver: Conflicts of AK changed """ # Only signal after change (post_add, post_delete, post_clear) are relevant if not action.startswith("post"): return event = instance.event # Conflict(s) changed: Might affect multiple AKs that are conflicts of each other violation_type = ConstraintViolation.ViolationType.AK_CONFLICT_COLLISION new_violations = [] slots_of_this_ak: [AKSlot] = instance.akslot_set.filter(start__isnull=False) conflicts_of_this_ak: [AK] = instance.conflicts.all() # Loop over all existing conflicts for ak in conflicts_of_this_ak: if ak != instance: for other_slot in ak.akslot_set.filter(start__isnull=False): for slot in slots_of_this_ak: # ...find overlapping slots... if slot.overlaps(other_slot): # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, ) c.aks_tmp.add(instance) c.ak_slots_tmp.add(slot) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(m2m_changed, sender=AK.prerequisites.through) def ak_prerequisites_changed_handler(sender, instance: AK, action: str, **kwargs): """ Signal receiver: Prerequisites of AK changed """ # Only signal after change (post_add, post_delete, post_clear) are relevant if not action.startswith("post"): return event = instance.event # Prerequisite(s) changed: Might affect multiple AKs that should have a certain order violation_type = ConstraintViolation.ViolationType.AK_BEFORE_PREREQUISITE new_violations = [] slots_of_this_ak: [AKSlot] = instance.akslot_set.filter(start__isnull=False) prerequisites_of_this_ak: [AK] = instance.prerequisites.all() # Loop over all prerequisites for ak in prerequisites_of_this_ak: if ak != instance: for other_slot in ak.akslot_set.filter(start__isnull=False): for slot in slots_of_this_ak: # ...find overlapping slots... if other_slot.end > slot.start: # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, ) c.aks_tmp.add(instance) c.ak_slots_tmp.add(slot) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(m2m_changed, sender=AK.requirements.through) def ak_requirements_changed_handler(sender, instance: AK, action: str, **kwargs): """ Signal receiver: Requirements of AK changed """ # Only signal after change (post_add, post_delete, post_clear) are relevant if not action.startswith("post"): return event = instance.event # Requirement(s) changed: Might affect slots and rooms violation_type = ConstraintViolation.ViolationType.REQUIRE_NOT_GIVEN new_violations = [] slots_of_this_ak: [AKSlot] = instance.akslot_set.filter(start__isnull=False) # For all requirements (after recent change)... for slot in slots_of_this_ak: room = slot.room room_requirements = room.properties.all() for requirement in instance.requirements.all(): if not requirement in room_requirements: # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, requirement=requirement, room=room, ) c.aks_tmp.add(instance) c.ak_slots_tmp.add(slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(post_save, sender=AKSlot) def akslot_changed_handler(sender, instance: AKSlot, **kwargs): """ Signal receiver: AKSlot changed Changes might affect: Duplicate parallel, Two in room, Resodeadline """ # TODO Consider rewriting this very long and complex method to resolve several (style) issues: # pylint: disable=too-many-nested-blocks,too-many-locals,too-many-branches,too-many-statements event = instance.event # == Check for two parallel slots by one of the owners == violation_type = ConstraintViolation.ViolationType.OWNER_TWO_SLOTS new_violations = [] if instance.start: # For all owners (after recent change)... for owner in instance.ak.owners.all(): # ...find other slots that might be overlapping... for ak in owner.ak_set.all(): # ...find overlapping slots... if ak != instance.ak: for other_slot in ak.akslot_set.filter(start__isnull=False): if instance.overlaps(other_slot): # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, ak_owner=owner ) c.aks_tmp.add(instance.ak) c.aks_tmp.add(other_slot.ak) c.ak_slots_tmp.add(instance) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check) # == Check for two aks in the same room at the same time == violation_type = ConstraintViolation.ViolationType.ROOM_TWO_SLOTS new_violations = [] # For all slots in this room... if instance.room: for other_slot in instance.room.akslot_set.all(): if other_slot != instance: # ... find overlapping slots... if instance.overlaps(other_slot): # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.WARNING, event=event, room=instance.room ) c.aks_tmp.add(instance.ak) c.aks_tmp.add(other_slot.ak) c.ak_slots_tmp.add(instance) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the slot that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check) # == Check for reso ak after reso deadline == update_cv_reso_deadline_for_slot(instance) # == Check for two slots of the same AK at the same time (warning) == violation_type = ConstraintViolation.ViolationType.AK_SLOT_COLLISION new_violations = [] if instance.start: # For all other slots of this ak... for other_slot in instance.ak.akslot_set.filter(start__isnull=False): if other_slot != instance: # ... find overlapping slots... if instance.overlaps(other_slot): # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.WARNING, event=event, ) c.aks_tmp.add(instance.ak) c.ak_slots_tmp.add(instance) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the slot that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) update_constraint_violations(new_violations, existing_violations_to_check) # == Check for slot outside availability == # An AK's availability changed: Might affect AK slots scheduled outside the permitted time violation_type = ConstraintViolation.ViolationType.SLOT_OUTSIDE_AVAIL new_violations = [] if instance.start: availabilities_of_this_ak: [Availability] = instance.ak.availabilities.all() covered = False for availability in availabilities_of_this_ak: covered = availability.start <= instance.start and availability.end >= instance.end if covered: break if not covered: c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event ) c.aks_tmp.add(instance.ak) c.ak_slots_tmp.add(instance) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check) # == Check for requirement not fulfilled by room == # Room(s) changed: Might affect slots and rooms violation_type = ConstraintViolation.ViolationType.REQUIRE_NOT_GIVEN new_violations = [] if instance.room: room_requirements = instance.room.properties.all() for requirement in instance.ak.requirements.all(): if requirement not in room_requirements: # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, requirement=requirement, room=instance.room, ) c.aks_tmp.add(instance.ak) c.ak_slots_tmp.add(instance) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check) # == check for simultaneous slots of conflicting AKs == violation_type = ConstraintViolation.ViolationType.AK_CONFLICT_COLLISION new_violations = [] if instance.start: conflicts_of_this_ak: [AK] = instance.ak.conflicts.all() for ak in conflicts_of_this_ak: if ak != instance.ak: for other_slot in ak.akslot_set.filter(start__isnull=False): # ...find overlapping slots... if instance.overlaps(other_slot): # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, ) c.aks_tmp.add(instance.ak) c.ak_slots_tmp.add(instance) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.ak.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check) # == check for missing prerequisites == violation_type = ConstraintViolation.ViolationType.AK_BEFORE_PREREQUISITE new_violations = [] if instance.start: prerequisites_of_this_ak: [AK] = instance.ak.prerequisites.all() for ak in prerequisites_of_this_ak: if ak != instance.ak: for other_slot in ak.akslot_set.filter(start__isnull=False): # ...find slots in the wrong order... if other_slot.end > instance.start: # ...and create a temporary violation if necessary... c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event, ) c.aks_tmp.add(instance.ak) c.ak_slots_tmp.add(instance) c.ak_slots_tmp.add(other_slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.ak.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check) # == Check for room capacity == cv = check_capacity_for_slot(instance) new_violations = [cv] if cv is not None else [] # Compare to/update list of existing violations of this type for this slot existing_violations_to_check = list( instance.constraintviolation_set.filter(type=ConstraintViolation.ViolationType.ROOM_CAPACITY_EXCEEDED) ) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(pre_delete, sender=AKSlot) def akslot_deleted_handler(sender, instance: AKSlot, **kwargs): """ Signal receiver: AKSlot deleted Manually clean up or remove constraint violations that belong to this slot since there is no cascade deletion for many2many relationships. Explicitly listening for AK deletion signals is not necessary since they will transitively trigger this signal and we always set both AK and AKSlot references in a constraint violation """ # print(f"{instance} deleted") for cv in instance.constraintviolation_set.all(): # Make sure not delete CVs that e.g., show three parallel slots in a single room if cv.ak_slots.count() <= 2: cv.delete()
[docs] @receiver(post_save, sender=Room) def room_changed_handler(sender, instance: Room, **kwargs): """ Signal receiver: Room changed Changes might affect: Room size """ # Check room capacities violation_type = ConstraintViolation.ViolationType.ROOM_CAPACITY_EXCEEDED new_violations = [] for slot in instance.akslot_set.all(): cv = check_capacity_for_slot(slot) if cv is not None: new_violations.append(cv) existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(m2m_changed, sender=Room.properties.through) def room_requirements_changed_handler(sender, instance: Room, action: str, **kwargs): """ Signal Receiver: Requirements of room changed """ # Only signal after change (post_add, post_delete, post_clear) are relevant if not action.startswith("post"): return
# event = instance.event # TODO React to changes
[docs] @receiver(post_save, sender=Availability) def availability_changed_handler(sender, instance: Availability, **kwargs): """ Signal receiver: Availalability changed Changes might affect: category availability, AK availability, Room availability """ event = instance.event # An AK's availability changed: Might affect AK slots scheduled outside the permitted time if instance.ak: violation_type = ConstraintViolation.ViolationType.SLOT_OUTSIDE_AVAIL new_violations = [] availabilities_of_this_ak: [Availability] = instance.ak.availabilities.all() slots_of_this_ak: [AKSlot] = instance.ak.akslot_set.filter(start__isnull=False) for slot in slots_of_this_ak: covered = False for availability in availabilities_of_this_ak: covered = availability.start <= slot.start and availability.end >= slot.end if covered: break if not covered: c = ConstraintViolation( type=violation_type, level=ConstraintViolation.ViolationLevel.VIOLATION, event=event ) c.aks_tmp.add(instance.ak) c.ak_slots_tmp.add(slot) new_violations.append(c) # ... and compare to/update list of existing violations of this type # belonging to the AK that was recently changed (important!) existing_violations_to_check = list(instance.ak.constraintviolation_set.filter(type=violation_type)) # print(existing_violations_to_check) update_constraint_violations(new_violations, existing_violations_to_check)
[docs] @receiver(post_save, sender=Event) def event_changed_handler(sender, instance: Event, **kwargs): """ Signal receiver: Event changed Changes might affect: Reso deadline """ # Check for reso ak after reso deadline (which might have changed) if instance.reso_deadline: for slot in instance.akslot_set.filter(start__isnull=False, ak__reso=True): update_cv_reso_deadline_for_slot(slot) else: # No reso deadline, delete all violations violation_type = ConstraintViolation.ViolationType.AK_AFTER_RESODEADLINE existing_violations_to_check = list(instance.constraintviolation_set.filter(type=violation_type)) update_constraint_violations([], existing_violations_to_check)