You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lock.py 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #!/usr/bin/env python
  2. import subprocess
  3. import threading
  4. import getpass
  5. import os
  6. import time
  7. import glob
  8. class I3Locker:
  9. def run(self):
  10. print("starting mlock")
  11. self.proc = subprocess.Popen([ "mlock" ])
  12. code = self.proc.wait()
  13. if code == 0 or self.killed:
  14. return 0
  15. else:
  16. print("mlock exited with code "+str(code))
  17. return -1
  18. def kill(self):
  19. self.killed = True
  20. self.proc.terminate()
  21. class FaceLocker:
  22. def run(self):
  23. self.delay = 200
  24. self.dev = 2
  25. self.running = True
  26. self.waitingProc = None
  27. # Import here because it's sloow
  28. import face_recognition
  29. import cv2
  30. import numpy as np
  31. # Read all face files
  32. faceencs = []
  33. path = f"./faces/{getpass.getuser()}/*.npy"
  34. paths = []
  35. for p in glob.glob(path):
  36. print(f"reading {p}")
  37. faceencs.append(np.load(p))
  38. paths.append(p)
  39. # Wait here if we're on battery
  40. battery = "/sys/class/power_supply/BAT0"
  41. keyboard = "AT Translated Set 2 keyboard"
  42. key = 36
  43. bat = False
  44. with open(f"{battery}/status", "r") as f:
  45. s = f.read().strip()
  46. if s == "Discharging" or s == "Unknown":
  47. bat = True
  48. if bat:
  49. self.waitForKey(keyboard, key)
  50. if self.killed:
  51. return 0
  52. # Match faces, blocks until a match is found or we're killed
  53. self.runFaces(faceencs, paths, np, face_recognition, cv2)
  54. if self.matching or self.killed:
  55. return 0
  56. else:
  57. return -1
  58. def runFaces(self, faceencs, paths, np, face_recognition, cv2):
  59. self.matching = False
  60. cap = cv2.VideoCapture(self.dev)
  61. tacc = self.delay
  62. then = 0
  63. avg = 128
  64. while not self.matching and self.running:
  65. ret, frame = cap.read()
  66. mean = cv2.mean(frame)[0]
  67. avg = (avg + mean) / 2
  68. if mean < avg:
  69. continue
  70. # delay
  71. now = time.time() * 1000
  72. if tacc < self.delay:
  73. tacc += now - then
  74. then = now
  75. continue
  76. else:
  77. tacc = 0
  78. then = now
  79. scale = 1
  80. rgb_frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
  81. small_rgb_frame = cv2.resize(rgb_frame, (0, 0), fx=scale, fy=scale)
  82. framelocs = face_recognition.face_locations(small_rgb_frame)
  83. frameencs = face_recognition.face_encodings(small_rgb_frame, framelocs)
  84. # Loop through each face in this frame of video
  85. for (top, right, bottom, left), frameenc in zip(framelocs, frameencs):
  86. # See if the face is a match for the known face(s)
  87. dists = face_recognition.face_distance(faceencs, frameenc)
  88. dist = dists[0]
  89. distidx = 0
  90. for i, d in enumerate(dists):
  91. print(i, d)
  92. if d < dist:
  93. dist = d
  94. distidx = i
  95. print(f"Distance: {dist} ({paths[distidx]})")
  96. # If a match was found in known_face_encodings, just use the first one.
  97. if dist <= 0.4:
  98. self.matching = True
  99. def waitForKey(self, keyboard, key):
  100. if self.killed:
  101. return
  102. print("Waiting for enter before starting face recognition")
  103. self.waitingProc = subprocess.Popen(
  104. f"xinput test '{keyboard}' | grep --line-buffered 'key press {key}' | exit",
  105. shell=True)
  106. # Blink IR blasters
  107. import cv2
  108. cap = cv2.VideoCapture(self.dev)
  109. cap.release()
  110. self.waitingProc.wait()
  111. def kill(self):
  112. self.killed = True
  113. self.running = False
  114. if self.waitingProc:
  115. self.waitingProc.terminate()
  116. lockers = [
  117. I3Locker(),
  118. FaceLocker(),
  119. ]
  120. def runLocker(locker):
  121. ret = locker.run()
  122. if ret == 0:
  123. print(locker.__class__.__name__+" unlocked.")
  124. for l in lockers:
  125. if l == locker:
  126. continue
  127. l.kill()
  128. else:
  129. print(locker.__class__.__name__+" failed.")
  130. threads = []
  131. for locker in lockers:
  132. th = threading.Thread(target=runLocker, args=(locker,))
  133. th.start()
  134. threads.append(th)
  135. for th in threads:
  136. th.join()