_nixcommon.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # -*- coding: utf-8 -*-
  2. import struct
  3. import os
  4. import atexit
  5. from time import time as now
  6. from threading import Thread
  7. from glob import glob
  8. try:
  9. from queue import Queue
  10. except ImportError:
  11. from Queue import Queue
  12. event_bin_format = 'llHHI'
  13. # Taken from include/linux/input.h
  14. # https://www.kernel.org/doc/Documentation/input/event-codes.txt
  15. EV_SYN = 0x00
  16. EV_KEY = 0x01
  17. EV_REL = 0x02
  18. EV_ABS = 0x03
  19. EV_MSC = 0x04
  20. def make_uinput():
  21. import fcntl, struct
  22. # Requires uinput driver, but it's usually available.
  23. uinput = open("/dev/uinput", 'wb')
  24. UI_SET_EVBIT = 0x40045564
  25. fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY)
  26. UI_SET_KEYBIT = 0x40045565
  27. for i in range(256):
  28. fcntl.ioctl(uinput, UI_SET_KEYBIT, i)
  29. BUS_USB = 0x03
  30. uinput_user_dev = "80sHHHHi64i64i64i64i"
  31. axis = [0] * 64 * 4
  32. uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis))
  33. uinput.flush() # Without this you may get Errno 22: Invalid argument.
  34. UI_DEV_CREATE = 0x5501
  35. fcntl.ioctl(uinput, UI_DEV_CREATE)
  36. UI_DEV_DESTROY = 0x5502
  37. #fcntl.ioctl(uinput, UI_DEV_DESTROY)
  38. return uinput
  39. class EventDevice(object):
  40. def __init__(self, path):
  41. self.path = path
  42. self._input_file = None
  43. self._output_file = None
  44. @property
  45. def input_file(self):
  46. if self._input_file is None:
  47. try:
  48. self._input_file = open(self.path, 'rb')
  49. except IOError as e:
  50. if e.strerror == 'Permission denied':
  51. print('Permission denied ({}). You must be sudo to access global events.'.format(self.path))
  52. exit()
  53. def try_close():
  54. try:
  55. self._input_file.close
  56. except:
  57. pass
  58. atexit.register(try_close)
  59. return self._input_file
  60. @property
  61. def output_file(self):
  62. if self._output_file is None:
  63. self._output_file = open(self.path, 'wb')
  64. atexit.register(self._output_file.close)
  65. return self._output_file
  66. def read_event(self):
  67. data = self.input_file.read(struct.calcsize(event_bin_format))
  68. seconds, microseconds, type, code, value = struct.unpack(event_bin_format, data)
  69. return seconds + microseconds / 1e6, type, code, value, self.path
  70. def write_event(self, type, code, value):
  71. integer, fraction = divmod(now(), 1)
  72. seconds = int(integer)
  73. microseconds = int(fraction * 1e6)
  74. data_event = struct.pack(event_bin_format, seconds, microseconds, type, code, value)
  75. # Send a sync event to ensure other programs update.
  76. sync_event = struct.pack(event_bin_format, seconds, microseconds, EV_SYN, 0, 0)
  77. self.output_file.write(data_event + sync_event)
  78. self.output_file.flush()
  79. class AggregatedEventDevice(object):
  80. def __init__(self, devices, output=None):
  81. self.event_queue = Queue()
  82. self.devices = devices
  83. self.output = output or self.devices[0]
  84. def start_reading(device):
  85. while True:
  86. self.event_queue.put(device.read_event())
  87. for device in self.devices:
  88. thread = Thread(target=start_reading, args=[device])
  89. thread.setDaemon(True)
  90. thread.start()
  91. def read_event(self):
  92. return self.event_queue.get(block=True)
  93. def write_event(self, type, code, value):
  94. self.output.write_event(type, code, value)
  95. import re
  96. from collections import namedtuple
  97. DeviceDescription = namedtuple('DeviceDescription', 'event_file is_mouse is_keyboard')
  98. device_pattern = r"""N: Name="([^"]+?)".+?H: Handlers=([^\n]+)"""
  99. def list_devices_from_proc(type_name):
  100. try:
  101. with open('/proc/bus/input/devices') as f:
  102. description = f.read()
  103. except FileNotFoundError:
  104. return
  105. devices = {}
  106. for name, handlers in re.findall(device_pattern, description, re.DOTALL):
  107. path = '/dev/input/event' + re.search(r'event(\d+)', handlers).group(1)
  108. if type_name in handlers:
  109. yield EventDevice(path)
  110. def list_devices_from_by_id(type_name):
  111. for path in glob('/dev/input/by-id/*-event-' + type_name):
  112. yield EventDevice(path)
  113. def aggregate_devices(type_name):
  114. # Some systems have multiple keyboards with different range of allowed keys
  115. # on each one, like a notebook with a "keyboard" device exclusive for the
  116. # power button. Instead of figuring out which keyboard allows which key to
  117. # send events, we create a fake device and send all events through there.
  118. uinput = make_uinput()
  119. fake_device = EventDevice('uinput Fake Device')
  120. fake_device._input_file = uinput
  121. fake_device._output_file = uinput
  122. # We don't aggregate devices from different sources to avoid
  123. # duplicates.
  124. devices_from_proc = list(list_devices_from_proc(type_name))
  125. if devices_from_proc:
  126. return AggregatedEventDevice(devices_from_proc, output=fake_device)
  127. # breaks on mouse for virtualbox
  128. # was getting /dev/input/by-id/usb-VirtualBox_USB_Tablet-event-mouse
  129. devices_from_by_id = list(list_devices_from_by_id(type_name))
  130. if devices_from_by_id:
  131. return AggregatedEventDevice(devices_from_by_id, output=fake_device)
  132. # If no keyboards were found we can only use the fake device to send keys.
  133. return fake_device
  134. def ensure_root():
  135. if os.geteuid() != 0:
  136. raise ImportError('You must be root to use this library on linux.')