You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lock.py 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #!/usr/bin/env python
  2. import subprocess
  3. import threading
  4. import getpass
  5. import os
  6. import time
  7. import glob
  8. class I3Locker:
  9. def __init__(self):
  10. self.killed = False
  11. self.proc = None
  12. def run(self):
  13. self.proc = subprocess.Popen([ "mlock" ])
  14. code = self.proc.wait()
  15. if code == 0 or self.killed:
  16. return 0
  17. else:
  18. print("mlock exited with code "+str(code))
  19. return -1
  20. def kill(self):
  21. self.killed = True
  22. self.proc.terminate()
  23. class FaceLocker:
  24. def __init__(self):
  25. self.delay = 200
  26. self.dev = 2
  27. self.running = False
  28. self.killed = False
  29. self.waitingProc = None
  30. def run(self):
  31. self.running = True
  32. # Import here because it's sloow
  33. import face_recognition
  34. import cv2
  35. import numpy as np
  36. # Read all face files
  37. faceencs = []
  38. path = f"./faces/{getpass.getuser()}/*.npy"
  39. paths = []
  40. for p in glob.glob(path):
  41. print(f"Reading {p}")
  42. faceencs.append(np.load(p))
  43. paths.append(p)
  44. # Wait here if we're on battery
  45. battery = "/sys/class/power_supply/BAT0"
  46. keyboard = "AT Translated Set 2 keyboard"
  47. key = 36
  48. bat = False
  49. with open(f"{battery}/status", "r") as f:
  50. s = f.read().strip()
  51. if s == "Discharging" or s == "Unknown":
  52. bat = True
  53. if bat:
  54. self.waitForKey(keyboard, key)
  55. if self.killed:
  56. return 0
  57. # Match faces, blocks until a match is found or we're killed
  58. self.runFaces(faceencs, paths, np, face_recognition, cv2)
  59. if self.matching or self.killed:
  60. return 0
  61. else:
  62. return -1
  63. def runFaces(self, faceencs, paths, np, face_recognition, cv2):
  64. self.matching = False
  65. cap = cv2.VideoCapture(self.dev)
  66. tacc = self.delay
  67. then = 0
  68. avg = 128
  69. while not self.matching and self.running:
  70. ret, frame = cap.read()
  71. mean = cv2.mean(frame)[0]
  72. avg = (avg + mean) / 2
  73. if mean < avg:
  74. continue
  75. # delay
  76. now = time.time() * 1000
  77. if tacc < self.delay:
  78. tacc += now - then
  79. then = now
  80. continue
  81. else:
  82. tacc = 0
  83. then = now
  84. scale = 1
  85. rgb_frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
  86. small_rgb_frame = cv2.resize(rgb_frame, (0, 0), fx=scale, fy=scale)
  87. framelocs = face_recognition.face_locations(small_rgb_frame)
  88. frameencs = face_recognition.face_encodings(small_rgb_frame, framelocs)
  89. # Loop through each face in this frame of video
  90. for (top, right, bottom, left), frameenc in zip(framelocs, frameencs):
  91. # See if the face is a match for the known face(s)
  92. dists = face_recognition.face_distance(faceencs, frameenc)
  93. dist = dists[0]
  94. distidx = 0
  95. for i, d in enumerate(dists):
  96. print(i, d)
  97. if d < dist:
  98. dist = d
  99. distidx = i
  100. print(f"Distance: {dist} ({paths[distidx]})")
  101. # If a match was found in known_face_encodings, just use the first one.
  102. if dist <= 0.4:
  103. self.matching = True
  104. def waitForKey(self, keyboard, key):
  105. if self.killed:
  106. return
  107. print("Waiting for enter before starting face recognition")
  108. self.waitingProc = subprocess.Popen(
  109. f"xinput test '{keyboard}' | grep --line-buffered 'key press {key}' | exit",
  110. shell=True)
  111. # Blink IR blasters
  112. import cv2
  113. cap = cv2.VideoCapture(self.dev)
  114. time.sleep(0.1)
  115. cap.release()
  116. self.waitingProc.wait()
  117. def kill(self):
  118. self.killed = True
  119. self.running = False
  120. if self.waitingProc:
  121. self.waitingProc.terminate()
  122. lockers = [
  123. I3Locker(),
  124. FaceLocker(),
  125. ]
  126. def runLocker(locker):
  127. print("Starting "+locker.__class__.__name__)
  128. ret = locker.run()
  129. if ret == 0:
  130. print(locker.__class__.__name__+" unlocked.")
  131. for l in lockers:
  132. if l == locker:
  133. continue
  134. l.kill()
  135. else:
  136. print(locker.__class__.__name__+" failed.")
  137. threads = []
  138. for locker in lockers:
  139. th = threading.Thread(target=runLocker, args=(locker,))
  140. th.start()
  141. threads.append(th)
  142. for th in threads:
  143. th.join()