You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lock.py 4.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #!/usr/bin/env python
  2. import subprocess
  3. import threading
  4. import getpass
  5. import os
  6. import time
  7. class I3Locker:
  8. def run(self):
  9. self.proc = subprocess.Popen([ "mlock" ])
  10. code = self.proc.wait()
  11. if code == 0 or self.killed:
  12. return 0
  13. else:
  14. print("mlock exited with code "+str(code))
  15. return -1
  16. def kill(self):
  17. self.killed = True
  18. self.proc.terminate()
  19. class FaceLocker:
  20. def run(self):
  21. self.delay = 200
  22. self.dev = 2
  23. self.running = True
  24. self.waitingProc = None
  25. # Import here because it's sloow
  26. import numpy as np
  27. import face_recognition
  28. import cv2
  29. # Read all face files
  30. faces = []
  31. faceencs = []
  32. path = f"./faces/{getpass.getuser()}"
  33. paths = []
  34. for f in os.listdir(path):
  35. p = f"{path}/{f}"
  36. print(f"reading {p}")
  37. face = face_recognition.load_image_file(p)
  38. faces.append(face)
  39. encs = face_recognition.face_encodings(face)
  40. if len(encs) == 0:
  41. print("Warning: "+path+" has no face!")
  42. continue
  43. faceencs.append(encs[0])
  44. paths.append(p)
  45. # Wait here if we're on battery
  46. battery = "/sys/class/power_supply/BAT0"
  47. keyboard = "AT Translated Set 2 keyboard"
  48. key = 36
  49. bat = False
  50. with open(f"{battery}/status", "r") as f:
  51. s = f.read().strip()
  52. if s == "Discharging" or s == "Unknown":
  53. bat = True
  54. if bat:
  55. print("Waiting for enter before starting face recognition")
  56. # Blink IR blasters
  57. cap = cv2.VideoCapture(self.dev)
  58. cap.release()
  59. self.waitForKey(keyboard, key)
  60. # Match faces, blocks until a match is found or we're killed
  61. self.runFaces(faceencs, paths, np, face_recognition, cv2)
  62. if self.matching or self.killed:
  63. return 0
  64. else:
  65. return -1
  66. def runFaces(self, faceencs, paths, np, face_recognition, cv2):
  67. self.matching = False
  68. cap = cv2.VideoCapture(self.dev)
  69. tacc = self.delay
  70. then = 0
  71. avg = 128
  72. while not self.matching and self.running:
  73. ret, frame = cap.read()
  74. mean = cv2.mean(frame)[0]
  75. avg = (avg + mean) / 2
  76. if mean < avg:
  77. continue
  78. # delay
  79. now = time.time() * 1000
  80. if tacc < self.delay:
  81. tacc += now - then
  82. then = now
  83. continue
  84. else:
  85. tacc = 0
  86. then = now
  87. scale = 1
  88. rgb_frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
  89. small_rgb_frame = cv2.resize(rgb_frame, (0, 0), fx=scale, fy=scale)
  90. framelocs = face_recognition.face_locations(small_rgb_frame)
  91. frameencs = face_recognition.face_encodings(small_rgb_frame, framelocs)
  92. # Loop through each face in this frame of video
  93. for (top, right, bottom, left), frameenc in zip(framelocs, frameencs):
  94. # See if the face is a match for the known face(s)
  95. dists = face_recognition.face_distance(faceencs, frameenc)
  96. dist = dists[0]
  97. distidx = 0
  98. for i, d in enumerate(dists):
  99. if d < dist:
  100. dist = d
  101. distidx = i
  102. print(f"Distance: {dist} ({paths[distidx]})")
  103. # If a match was found in known_face_encodings, just use the first one.
  104. if dist <= 0.4:
  105. self.matching = True
  106. def waitForKey(self, keyboard, key):
  107. self.waitingProc = subprocess.Popen(
  108. f"xinput test '{keyboard}' | grep --line-buffered 'key press {key}' | exit",
  109. shell=True)
  110. self.waitingProc.wait()
  111. def kill(self):
  112. self.killed = True
  113. self.running = False
  114. if self.waitingProc:
  115. self.waitingProc.terminate()
  116. lockers = [
  117. I3Locker(),
  118. FaceLocker(),
  119. ]
  120. def runLocker(locker):
  121. ret = locker.run()
  122. if ret == 0:
  123. print(locker.__class__.__name__+" unlocked.")
  124. for l in lockers:
  125. if l == locker:
  126. continue
  127. l.kill()
  128. else:
  129. print(locker.__class__.__name__+" failed.")
  130. threads = []
  131. for locker in lockers:
  132. th = threading.Thread(target=runLocker, args=(locker,))
  133. th.start()
  134. threads.append(th)
  135. for th in threads:
  136. th.join()