123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- from threading import Lock, Thread
- from timeit import default_timer as timer
- from ._keyboard_event import normalize_name
- import re
- class KeyTable(object):
- _keys = {}
- _write = Lock() # Required to edit keys
- _table = {}
- _time = -1
- _elapsed = 0 # Maximum time that has elapsed so far in the sequence
- _read = Lock() # Required to edit table
- _in_sequence = False
- _keys_suppressed = [] # List of keys that have been suppressed so far in the sequence
- _disable = False # Disables key suppression during replay to avoid infinite loop
- SEQUENCE_END = 2 # Delimeter that signifies the end of the sequence
- def __init__(self, press_key, release_key):
- self.press_key = press_key
- self.release_key = release_key
- def is_allowed(self, key, is_up, advance=True):
- """
- The goal of this function is to be very fast. This is accomplished
- through the table structure, which ensures that we only need to
- check whether `key is in self._table` and change what variable
- is referenced by `self._table`.
- Unfortunately, handling timeouts properly has added significantly to
- the logic required, but the function should still be well within required
- time limits.
- """
- if self._disable:
- return True
- if key != self.SEQUENCE_END:
- key = re.sub('(left|right) ', '', key)
- time = timer()
- if self._time == -1:
- elapsed = 0
- else:
- elapsed = time - self._time
- if self._elapsed > elapsed:
- elapsed = self._elapsed
- if is_up:
- if self._in_sequence:
- if key != self.SEQUENCE_END:
- self._keys_suppressed.append((key, is_up))
- return False
- else:
- advance = False
- in_sequence = key in self._table and elapsed < self._table[key][0]
- in_keys = key in self._keys
- suppress = in_sequence or in_keys
- if advance:
- self._read.acquire()
- if in_sequence and self._table[key][2]:
- del self._keys_suppressed[:]
- if in_sequence and self._table[key][1]:
- self._table = self._table[key][1]
- if self._time != -1:
- self._elapsed = elapsed
- self._time = -1
- elif in_keys and self._keys[key][1]:
- self._table = self._keys[key][1]
- if self._time != -1:
- self._elapsed = elapsed
- self._time = -1
- self._replay_keys()
- del self._keys_suppressed[:]
- else:
- self._table = self._keys
- self._time = -1
- self._elapsed = -1
- self._replay_keys()
- del self._keys_suppressed[:]
- self._in_sequence = in_sequence
- self._read.release()
- if key != self.SEQUENCE_END and suppress:
- self._keys_suppressed.append((key, is_up))
- return not suppress
- def complete_sequence(self):
- if self.SEQUENCE_END in self._table:
- self.is_allowed(self.SEQUENCE_END, False)
- self._read.acquire()
- self._time = timer()
- self._read.release()
- else:
- self._read.acquire()
- self._time = -1
- self._elapsed = 0
- self._table = self._keys
- self._replay_keys()
- del self._keys_suppressed[:]
- self._read.release()
- def _replay_keys(self):
- self._disable = True
- for key, is_up in self._keys_suppressed:
- if is_up:
- self.release_key(key)
- else:
- self.press_key(key)
- self._disable = False
- def _refresh(self):
- self._read.acquire()
- self._disable = False
- self._table = self._keys
- self._read.release()
- def _acquire_table(self, sequence, table, timeout):
- """
- Returns a flat (single level) dictionary
- :param sequence:
- :param table:
- :return:
- """
- el = sequence.pop(0)
- if el not in table:
- table[el] = (timeout, {}, False)
- if table[el][0] < timeout:
- table[el][0] = timeout
- if sequence:
- return self._acquire_table(sequence, table[el][1], timeout)
- else:
- return table
- def suppress_sequence(self, sequence, timeout):
- """
- Adds keys to the suppress_keys table
- :param sequence: List of scan codes
- :param timeout: Time allowed to elapse before resetting
- """
- # the suppress_keys table is organized
- # as a dict of dicts so that the critical
- # path is only checking whether the
- # scan code is 'in current_dict'
- flat = []
- for subsequence in sequence:
- flat.extend(subsequence)
- flat.append(self.SEQUENCE_END)
- last_index = flat[-1]
- self._write.acquire()
- table = self._acquire_table(flat, self._keys, timeout)
- table[last_index] = (table[last_index][0], table[last_index][1], True)
- self._refresh()
- self._write.release()
- def suppress_none(self):
- """
- Clears the suppress_keys table and disables
- key suppression
- :return:
- """
- self._write.acquire()
- self._keys = {}
- self._refresh()
- self._write.release()
- self._read.acquire()
- self._disable = True
- self._read.release()
|