1  from typing import Dict, Set, FrozenSet


2 


3  from tudelft.utilities.immutablelist.FixedList import FixedList


4  from tudelft.utilities.immutablelist.PowerSet import PowerSet


5 


6  from geniusweb.PriorityQueue import PriorityQueue


7  from geniusweb.actions.PartyId import PartyId


8  from geniusweb.actions.Vote import Vote


9  from geniusweb.actions.Votes import Votes


10  from geniusweb.issuevalue.Bid import Bid


11  from geniusweb.utils import toTuple, toStr


12 


13 


14  # who removed sign function from python?


15  sign = lambda a: (a>0)  (a<0)


16 


17  class CollectedVotes:


18  '''


19  Utility class. Typically contains all the votes collected from 1 round. Can


20  find the best combinations of votes, eg to get the maximum consensus group


21  '''


22 


23 


24  def __init__(self, votesMap:Dict[PartyId, Votes] , powers:Dict[PartyId, int] ) :


25  ''''


26  @param votesMap the {@link Votes} done for each party


27  @param powers the power of each party. The power is an integer number.


28  In the voting process, the party powers add up to form


29  the total voting power. This set may contain parties that


30  are not in newmap.


31  '''


32  self._allvotes = dict(votesMap)


33  self._powers = dict(powers)


34  self._allBids = None


35 


36  if not set(votesMap.keys()).issubset(set(powers.keys())):


37  raise ValueError("All voting parties must have a power")


38 


39  def With(self, votes:Votes , power:int)>"CollectedVotes":


40  '''


41  @param votes additional set of votes for a new party that did not yet


42  vote.


43  @param power powers for the new party, can be null if power already set


44  previously.


45  @return new CollectedVotes with the new votes added.


46  '''


47  actor = votes.getActor()


48  if actor in self._allvotes:


49  raise ValueError("Party " + str(actor) + " already voted")


50 


51  newpowers:Dict[PartyId, int] = dict(self._powers)


52  if not actor in self._powers:


53  if power == None:


54  raise ValueError("Power must be set for new actor " + str(actor))


55  newpowers[actor]= power


56 


57  newmap = dict(self._allvotes)


58  newmap[actor]= votes


59  return CollectedVotes(newmap, newpowers)


60 


61  def getVotes(self)>Dict[PartyId, Votes] :


62  '''


63  @return all the votes placed so far


64  '''


65  return dict(self._allvotes)


66 


67  def getPowers(self) >Dict[PartyId, int] :


68  '''


69  @return the powers of all parties


70  '''


71  return dict(self._powers)


72 


73  def getMaxAgreements(self)>Dict[Bid, FrozenSet[PartyId]]:


74  '''


75  @return a map containing for each {@link Bid}s a subset of


76  {@link PartyId}s that placed a satisfied vote for that bid, such


77  that the subset has maximum power. Maximum power means that there


78  is no other subset of satisfied votes for that bid with more


79  group power. There may be other subsets of equal power for the


80  bid. If there are no concensus possibilities at all for a bid,


81  the bid is not placed in the map.


82  '''


83  agreements: Dict[Bid, FrozenSet[PartyId]] = {}


84  allbidsmap = self.getAllBids()


85  for bid in allbidsmap.keys():


86  mx = self._getMaxPowerGroup(allbidsmap[bid])


87  if len(mx)>0:


88  agreements[bid]=mx


89  return agreements


90 


91  def getAllBids(self) > Dict[Bid, Set[Vote]]:


92  '''


93  @return all bids that were voted on and the votes for that bid. This


94  basically reverses keys and values in {@link #allvotes}


95  '''


96  if self._allBids == None: #type:ignore


97  self._allBids = self._getAllBids1() #type:ignore


98  return self._allBids #type:ignore


99 


100  def Without(self, parties:Set[PartyId] )>"CollectedVotes":


101  '''


102  @param parties the parties to be removed


103  @return new CollectedVotes without given parties


104  '''


105  newvotes = dict(self._allvotes)


106  newpower = dict(self._powers)


107  for party in parties:


108  newvotes.pop(party)


109  newpower.pop(party)


110 


111  return CollectedVotes(newvotes, newpower)


112 


113  def getTotalPower(self, parties:FrozenSet[PartyId] ) > int:


114  '''


115  @param parties a list of parties


116  @return total voting power of the parties


117  '''


118  return sum([self._powers[p] for p in parties])


119 


120  def _getMaxPowerGroupBreathFirst(self, allvotes:Set[Vote] ) > FrozenSet[PartyId] :


121  '''


122  Breathfirst optimized search, similar as {@link #getMaxPowerGroup(Set)}


123  but more efficient on average (remains to be proven). Notice that this


124  algorithm may be more expensive particularly if there is no consensus at


125  all, because of the added overhead of the breathfirst search


126 


127  @param allvotes a list of all votes for a particular bid.


128  @return a consensusgroup with maximum power, so there is no other


129  consensus group that has more power (but there may be other


130  groups with same power). returns empty set if no consensus found


131  '''


132 


133  # CHECK how about scoping. What is 'this' referring to?


134  this=self


135  def comparator(vs1:Set[Vote], vs2:Set[Vote]):


136  parties1 = frozenset([vote.getActor() for vote in vs1])


137  parties2 = frozenset([vote.getActor() for vote in vs2])


138  return sign(this.getTotalPower(parties1)


139  this.getTotalPower(parties2))


140 


141  '''


142  queue that keeps the options for the breathfirst search sorted from


143  highest to lowest power of the voters.


144  '''


145  queue = PriorityQueue[Set[Vote]] (comparator)


146  queue.put(allvotes)


147 


148  while not queue.empty():


149  # try one with the highest power


150  vs = queue.get()


151  if self._isViable(vs):


152  return self.getParties(vs)


153  if len(vs) > 1:


154  # vote can be splitted. push all new subsets


155  for vote in vs:


156  subvotes = set(vs)


157  subvotes.remove(vote)


158  # avoid duplicate work. Since this has a removed party,


159  # and assuming power of each party >=1,


160  # we can not already have checked subvotes


161  if not subvotes in queue:


162  queue.put(subvotes)


163 


164  return frozenset()


165 


166  def _getMaxPowerGroup(self, votes:Set[Vote] )> FrozenSet[PartyId] :


167  '''


168  @param votes a list of all votes for a particular bid. The parties that


169  placed the votes are called 'voters'


170  @return a consensusgroup with maximum power, so there is no other


171  consensus group that has more power (but there may be other


172  groups with same power). returns empty set if no consensus found


173  '''


174  maxgroup:FrozenSet[PartyId] = frozenset()


175  maxpower = 0


176 


177  for parties in self._getConsensusGroups(votes):


178  power = self.getTotalPower(parties)


179  if power > maxpower:


180  maxgroup = parties


181  maxpower = power


182  return frozenset(maxgroup)


183 


184  def _getConsensusGroups(self, votes: Set[Vote] )> FrozenSet[FrozenSet[PartyId]] :


185  '''


186  @param votes a set of {@link Votes} for one single {@link Bid}. It is


187  assumed all votes are by different parties.


188  @return all subsets sets of parties that create a viable consensus vote.


189  A viable consensus voteset VS is a subset of the votes, such that


190  <ol>


191  <li>VS ≥ 2


192  <li>TP = The sum of the powers of the parties in VS


193  <li>forall v in VS: v_minpower ≤ TP ≤ v_maxpower


194  </ol>


195  Notice that this algorithm works in exponential space


196  (=expensive) and not suited for large numbers of parties. Should


197  work fine for up to 10 parties.


198  '''


199  groups:Set[FrozenSet[PartyId]] = set()


200 


201  allVotePermutations = PowerSet[Vote](FixedList[Vote](votes))


202  # check each possible subset.


203  for voteList in allVotePermutations:


204  if self._isViable(voteList):


205  groups.add(self.getParties(voteList))


206  return frozenset(groups)


207 


208  def _isViable(self, voteList) > bool:


209  '''


210  @param voteList a :list_iterator of votes for a single bid.


211  @return true if this votelist satisfies the requirements of all


212  participants and there are ≥ 2 parties


213  '''


214  parties = self.getParties(voteList)


215  totalpower = self.getTotalPower(parties);


216  return len(parties) >= 2 and totalpower >= self._getMinPower(voteList)\


217  and totalpower <= self._getMaxPower(voteList)


218 


219  @staticmethod


220  def getParties(voteList) > FrozenSet[PartyId] :


221  '''


222  @param voteList iterable of Vote, list of all {@link Vote}s


223  @return frozen set with all parties that did a vote


224  '''


225  parties:Set[PartyId] = set()


226  for vote in voteList:


227  parties.add(vote.getActor())


228  return frozenset(parties)


229 


230  def _getMinPower(self, votes) >int:


231  '''


232  @param votes a iterable of Vote, list of {@link Vote}s on one Bid. All parties must be


233  known


234  @return the minimum voting power needed for this group, so that all can


235  accept. This is equal to the max of the vote.getMinPower


236  '''


237  mx = 0


238  for vote in votes:


239  mx = max(mx, vote.getMinPower())


240  return mx


241 


242  def _getMaxPower(self, votes) > int:


243  '''


244  @param votes a iterable Vote list of {@link Vote}s on one Bid. All parties must be


245  known


246  @return the maximum voting power needed for this group, so that all can


247  accept. This is equal to the min of the vote.getMaxPower


248  '''


249  mn = 9999999999999999 #int.maxvalue?


250  for vote in votes:


251  mn = min(mn, vote.getMaxPower())


252  return mn


253 


254  def _getAllBids1(self) > Dict[Bid, Set[Vote]]:


255  bids:Dict[Bid, Set[Vote]] = {}


256  for votes in self._allvotes.values():


257  for vote in votes.getVotes():


258  bid = vote.getBid()


259  if not bid in bids:


260  bids[bid]=set()


261  bids[bid].add(vote)


262  return bids


263 


264  def __hash__(self):


265  return hash((toTuple(self._allvotes), toTuple(self._powers)))


266 


267  def __eq__(self, other):


268  return isinstance(other, self.__class__) \


269  and self._allvotes == other._allvotes\


270  and self._powers==other._powers


271 


272  def __repr__(self)>str:


273  return "CollectedVotes[" + toStr(self._allvotes) \


274  + "," + toStr(self._powers) + "]"

