Marker Tracking via WebSockets With a Raspberry Pi
Learn more about marker tracking with a Raspberry Pi.
Join the DZone community and get the full member experience.
Join For FreeImagine you intend to automatically move a couple of robots within a room. You need accurate position and orientation (direction frontside is pointing to) of each robot. Apparently, outdoor systems like GPS don't work and you have a small budget. What do you do?
After some research for easy-to-apply solutions, my students and I decided to visually track our robots. We put a camera at the ceiling continuously streaming a video of our robots below. What remained to be done was capturing the frames of the video stream, searching for the objects of interest inside and serving the findings.
While the solution I describe below works with all USB cameras of reasonable quality and any computer, we used a Raspberry Pi 3B+ with a Raspberry Camera to meet our budget limitations.
Tracking Objects With OpenCV
We first intended to track our robots with an object recognition algorithm but soon decided to save the time of training an appropriate system and rather stick markers to our robots. The open-source image processing library, OpenCV, comes with an easy-to-use marker detection framework called Aruco. Also, OpenCV includes a Python binding, if you don't feel comfortable with programming C/C++.
OpenCV itself is available as a package for pip, even in a headless bundle opencv-python-headless
. In addition to OpenCV, you will need to install third-party contributions to OpenCV opencv-contrib-python-headless
, which includes the Aruco library.
Adrian Rosebrock wrote a good post on installing OpenCV with pip and some quirks.
Detecting Markers in a Video Stream
We have to capture the video frame by frame and search the markers in these frames. While the Pi Camera could be used as a USB camera, there is the optimized Python library picamera
to grab images or stream video and capture frames.
camera = PiCamera()
camera.resolution = RESOLUTION # (1008,1008)
camera.framerate = FRAMERATE # 30
rawCapture = PiRGBArray(camera, size=RESOLUTION)
try:
for capture in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
frame = capture.array
markers = find_markers(frame)
# ... do something with the found markers here
rawCapture.truncate(0)
except:
logging.error("Capturing stopped with an error.");
finally:
camera.close();
The above code continuously captures frames from the Pi Camera in a loop and passes each frame to our find_markers
function. We wrapped some logging and error handling around.
Marker Position
For our robot navigation, we need to know where a robot (its marker) is, relative to the viewpoint of the camera, and where the robot is heading to. So, we want find_markers
to return a set of tuples containing id, position (x,y) and heading (0° equals north) of each marker.
Aruco supports marker sets with a different resolution like 4x4 or 6x6 grids. There is a quite good OpenCV tutorial explaining Arcuco markers, which also shows how to create markers. Alternatively, you could create markers conveniently online, for example, with Oleg Kalachev's marker generator.
Aruco provides the function detectMarkers
to search markers in an image. The function returns a multi-dimensional array of corners for each detected marker and another array with the marker IDs. In line 15-20 below, we calculate for each marker the center from the top-right and bottom-left corners. The resulting array has some extra dimension, so we use NumPy's squeze
before to reduce the dimensions of the corners array.
def find_markers(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejectedImgPoints = cv2.aruco.detectMarkers(gray, DICTIONARY, parameters=PARAMETERS)
rvecs, tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(corners, MARKER_EDGE, CAMERA_MATRIX, DIST_COEFFS)
result = set()
if ids is None:
return result
for i in range(0, len(ids)):
try:
id = str(ids[i][0])
marker = np.squeeze(corners[i])
x1,y1 = marker[0]
x2,y2 = marker[2]
x = int((x1 + x2)/2)
y = int((y1 + y2)/2)
bearing = calc_heading(rvecs[i][0])
result.add((id, x, y, bearing))
except Exception:
traceback.print_exc()
return result
You may have noted some constants, which we set as follows:
DICTIONARY = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250) # our marker dictionary
PARAMETERS = cv2.aruco.DetectorParameters_create() # default params
MARKER_EDGE = 0.05 # not really relevant for our use case; our marker size is 5cm x 5cm
Marker Heading
Detecting the heading of a marker is more challenging. There might be a better way to find the orientation of a marker based on the corners; however, we are using the Aruco function estimatePoseSingleMarkers
, which respects the distortion of the camera lens. For this reason, the function expects a camera matrix and a vector with the distortion coefficients. Even though there is a good tutorial on how to do this, finding the camera matrix and distortion coefficients of our camera based on OpenCV's camera calibration turned out as the hardest piece of work. (You may find Mark Hedley Jones' collection of chessboards useful.)
These are the values we obtained after many tries for our pi camera (lens 1/4“, aperture f/1.8, focal length 3.6mm, view angle diagonal 75.7°):
CAMERA_MATRIX = np.array([[2.01976721e+03, 0.00000000e+00, 1.32299009e+03],[0.00000000e+00, 2.00413989e+03, 8.60071427e+02],[0.00000000e+00, 0.00000000e+00, 1.00000000e+00]])
DIST_COEFFS = np.array([-0.63653859, 0.2963752, 0.03228459, 0.0028887, 0.82956323])
# IMPORTANT: this will only work for you if your camera has the same technical specifations
The pose detection returns arrays of rotation and translation vectors for all markers. We use the function calc_heading
to obtain the actual heading of each marker in degrees from its rotation vector. A rotation vector describes the rotation of a marker in three dimensions. Assuming we are looking at the markers from above, we are only interested in the rotation around the z-axis. Lines 2-5 calculate the angles for all axes in radians after transforming the rotation vector to a rotation matrix. Starting from line 10 in the listing below, the angle of the z-axis is transformed to degrees, normalized, and returned.
def angles_from_rvec(rvec):
r_mat, _jacobian = cv2.Rodrigues(rvec);
a = math.atan2(r_mat[2][1], r_mat[2][2])
b = math.atan2(-r_mat[2][0], math.sqrt(math.pow(r_mat[2][1],2) + math.pow(r_mat[2][2],2)))
c = math.atan2(r_mat[1][0], r_mat[0][0])
return [a,b,c];
def calc_heading(rvec):
angles = angles_from_rvec(rvec);
degree_angle = math.degrees(angles[2]);
if degree_angle < 0:
degree_angle = 360 + degree_angle
return degree_angle;
Full Tracking Service
Now, we have all pieces to assemble the marker tracking. We intend to run the tracking concurrently to a web sockets server later on. The capture loop is blocking, so we have to put everything into a separate thread. The Pi Camera cannot be used by two processes concurrently, so we also have to limit the number of threads accessing the camera by a ThreadPoolExecuter
with a single worker (lines 100, 110). Using event loops from the asyncio
module, we wire everything properly. Please note that we will have one event loop per thread. The base loop is always present, but we have to create a new one for our capture thread (lines 71-74 below). Our service notifies about found markers by invoking the asynchronous callback function on_track
.
import asyncio
import cv2
import concurrent
import logging
import math
import numpy as np
from picamera.array import PiRGBArray
from picamera import PiCamera
RESOLUTION = (1088,1088)
FRAMERATE = 30
DICTIONARY = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250)
# pi cam
CAMERA_MATRIX = np.array([[2.01976721e+03, 0.00000000e+00, 1.32299009e+03],[0.00000000e+00, 2.00413989e+03, 8.60071427e+02],[0.00000000e+00, 0.00000000e+00, 1.00000000e+00]])
DIST_COEFFS = np.array([-0.63653859, 0.2963752, 0.03228459, 0.0028887, 0.82956323])
PARAMETERS = cv2.aruco.DetectorParameters_create()
MARKER_EDGE = 0.05
def angles_from_rvec(rvec):
r_mat, _jacobian = cv2.Rodrigues(rvec)
a = math.atan2(r_mat[2][1], r_mat[2][2])
b = math.atan2(-r_mat[2][0], math.sqrt(math.pow(r_mat[2][1],2) + math.pow(r_mat[2][2],2)))
c = math.atan2(r_mat[1][0], r_mat[0][0])
return [a,b,c]
def calc_heading(rvec):
angles = angles_from_rvec(rvec)
degree_angle = math.degrees(angles[2])
if degree_angle < 0:
degree_angle = 360 + degree_angle
return degree_angle
def find_markers(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejectedImgPoints = cv2.aruco.detectMarkers(gray, DICTIONARY, parameters=PARAMETERS)
rvecs, tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(corners, MARKER_EDGE, CAMERA_MATRIX, DIST_COEFFS)
result = set()
if ids is None:
return result
for i in range(0, len(ids)):
try:
id = str(ids[i][0])
marker = np.squeeze(corners[i])
x0, y0 = marker[0]
x2, y2 = marker[2]
x = int((x0 + x2)/2)
y = int((y0 + y2)/2)
heading = calc_heading(rvecs[i][0])
result.add((id, x, y, heading))
except Exception:
traceback.print_exc()
return result
class PositioningSystem:
def __init__(self, on_update):
self._on_update = on_update
def start(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._capture())
loop.run_forever()
async def _capture(self):
camera = PiCamera()
camera.resolution = RESOLUTION
camera.framerate = FRAMERATE
rawCapture = PiRGBArray(camera, size=RESOLUTION)
asyncio.sleep(0.1)
logging.info("Start capturing from pi camera.")
try:
for capture in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
frame = capture.array
markers = find_markers(frame)
rawCapture.truncate(0)
for marker in markers:
await self._on_update(marker)
except:
logging.error("Capturing stopped with an error.")
finally:
camera.close()
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
async def track(on_track):
ps = PositioningSystem(on_track)
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, ps.start)
return ps
positioning.py
Connect to a WebSocket
The connection to the web socket server is straight forward. We took an example from here. The WebSocket server waits for clients to connect and invokes the on_connect
callback lines 15-24 for each established connection. The WebSocket is used to push position updates to clients only, we receive incoming messages, but in this example, we don't react. Instead, we put the socket into a set to be able to access it later on.
import asyncio
import websockets
import logging
import json
import positioning
PORT = 5678
LOG_LEVEL = "INFO"
logging.basicConfig(level=LOG_LEVEL)
sockets = set()
loop = asyncio.get_event_loop()
async def on_connect(socket, path):
logging.info("Socket connected...")
sockets.add(socket)
try:
while True:
message = await socket.recv()
logging.warning('Ignoring received message: {}', message)
except:
sockets.remove(socket)
logging.info("Socket disconnected (maybe in response to closing handshake)...")
async def on_track(position):
id, x, y, bearing = position
message = {
"id": id,
"x":x,
"y":y,
"bearing":bearing
}
for socket in sockets:
await socket.send(json.dumps(message))
if __name__ == "__main__":
logging.debug("Starting capture loop...")
start_positioning = positioning.track(on_track)
logging.debug("Starting websocket...")
start_server = websockets.serve(on_connect, port=PORT)
loop.run_until_complete(start_positioning)
loop.run_until_complete(start_server)
logging.info("Started ... ")
loop.run_forever()
tracking.py
As soon as new marker position is available, the on_track
function lines 30-38 is invoked by the tracking system. Here, we simply iterate through all open sockets and send the position updates as JSON-formatted strings. That's it.
Learnings and Final Remarks
Running OpenCV image processing from Python on a Raspberry Pi (3B+) is not a good idea if performance is critical. The situation might be different soon with a Raspberry PI 4. So far, processing ten frames with a resolution of 1000 x 1000 pixels per second was our peak sampling rate. Surprisingly, Pi and USB Camera did not show a big difference. Also, the marker detection turned out to be very error-prone. A change in light or vibrations of the robot made the detection unreliable. So, we started to implement some countermeasures, such as averaging out errors at the cost of the sampling rate. Finally, the results of the tracking were reasonably good, but the update interval of robot positions was around 500ms, which requires our robots to go really, really slow.
For this reason, we complemented our setup with Gyroscope and Accelerometer on the robot. While a robot is moving, we use the sensors. When a robot is at rest, we detect its absolute position. This does the trick for our application.
I'm open to all suggestions on how to improve code and my solution. If you are a student and you would like to do more of this and similar projects, then come to Berlin and meet us at the Designakademie Berlin.
Opinions expressed by DZone contributors are their own.
Comments