Processing 3D Data Using Python Multiprocessing Library
Large amounts of data reveal problems that require creative approaches. Fortunately, Python language and its extensive set of libraries can help.
Join the DZone community and get the full member experience.
Join For FreeToday we’ll cover the tools that are very handy with large amount of data. I'm not going to tell you only general information that might be found in manuals but share some little tricks that I’ve discovered, such as using tqdm
with multiprocessing imap
, working with archives in parallel, plotting and processing 3D data, and how to search for a similar object within object meshes if you have a point cloud.
So why should we resort to parallel computing? Nowadays, if you work with any kind of data you might face problems related to "big data". Each time we have the data that doesn’t fit the RAM we need to process it piece by piece. Fortunately, modern programming languages allow us to spawn multiple processes (or even threads) that work perfectly on multi-core processors. (NB: That doesn’t mean that single-core processors cannot handle multiprocessing. Here’s the Stack Overflow thread on that topic.)
Today we’ll try our hand at the frequently occurring 3D computer vision task of computing distances between mesh and point cloud. You might face this problem, for example, when you need to find a mesh within all available meshes that defines the same 3D object as the given point cloud.
Our data consist of .obj
files stored in .7z
archive, which is great in terms of storage efficiency. But when we need to access the exact portion of it, we should make an effort. Here I define the class that wraps up the 7-zip archive and provides an interface to the underlying data.
from io import BytesIO
import py7zlib
class MeshesArchive(object):
def __init__(self, archive_path):
fp = open(archive_path, 'rb')
self.archive = py7zlib.Archive7z(fp)
self.archive_path = archive_path
self.names_list = self.archive.getnames()
self.cur_id = 0
def __len__(self):
return len(self.names_list)
def get(self, name):
bytes_io = BytesIO(self.archive.getmember(name).read())
return bytes_io
def __getitem__(self, idx):
return self.get(self.names[idx])
def __iter__(self):
return self
def __next__(self):
if self.cur_id >= len(self.names_list):
raise StopIteration
name = self.names_list[self.cur_id]
self.cur_id += 1
return self.get(name)
This class hardly relies on py7zlib
package that allows us to decompress data each time we call get
method and give us the number of files inside an archive. We also define __iter__
that will help us to start multiprocessing map
on that object as on the iterable.
As you might know, it is possible to create a Python class from which one can instantiate iterable objects. Such class should meet the following conditions: override __getitem__
to return self
and __next__
to return following element. And we are definitely following this rule.
The above definition provides us a possibility to iterate over the archive but does it allow us to do a random access to contents in parallel? It’s an interesting question, to which I haven’t found an answer online, but we can research the source code of py7zlib
and try to answer by ourselves.
Here I provide reduced snippets of the code from pylzma:
class Archive7z(Base):
def __init__(self, file, password=None):
# ...
self.files = {}
# ...
for info in files.files:
# create an instance of ArchiveFile that knows location on disk
file = ArchiveFile(info, pos, src_pos, folder, self, maxsize=maxsize)
# ...
self.files.append(file)
# ...
self.files_map.update([(x.filename, x) for x in self.files])
# method that returns an ArchiveFile from files_map dictionary
def getmember(self, name):
if isinstance(name, (int, long)):
try:
return self.files[name]
except IndexError:
return None
return self.files_map.get(name, None)
class Archive7z(Base):
def read(self):
# ...
for level, coder in enumerate(self._folder.coders):
# ...
# get the decoder and decode the underlying data
data = getattr(self, decoder)(coder, data, level, num_coders)
return data
In the code, you can see methods that are called during reading the next object from the archive. I believe it is clear from above that there’s no reason for the archive being blocked whenever it is read multiple times simultaneously.
Next, let’s quickly introduce what are the meshes and the point clouds.
Firstly, meshes are the sets of vertices, edges, and faces. Vertices are defined by (x,y,z) coordinates in space and assigned with unique numbers. Edges and faces are the groups of point pairs and triplets accordingly and defined with mentioned unique point ids. Commonly, when we talk about “mesh” we mean “triangular mesh”, i.e. the surface consisting of triangles. Work with meshes in Python is much easier with trimesh
library. For example, it provides an interface to load .obj
files in memory. To display and interact with 3D objects in jupyter notebook
one can use k3d
library.
So, with the following code snippet I answer the question: “How do you plot atrimesh
object in jupyter
with k3d
?”
import trimesh
import k3d
with open("./data/meshes/stanford-bunny.obj") as f:
bunny_mesh = trimesh.load(f, 'obj')
plot = k3d.plot()
mesh = k3d.mesh(bunny_mesh.vertices, bunny_mesh.faces)
plot += mesh
plot.display()
Stanford Bunny mesh displayed by k3d
Secondly, point clouds are arrays of 3D points that represent objects in space. Many 3D scanners produce point clouds as a representation of a scanned object. For demonstration purposes, we can read the same mesh and display its vertices as a point cloud.
import trimesh
import k3d
with open("./data/meshes/stanford-bunny.obj") as f:
bunny_mesh = trimesh.load(f, 'obj')
plot = k3d.plot()
cloud = k3d.points(bunny_mesh.vertices, point_size=0.0001, shader="flat")
plot += cloud
plot.display()
Point cloud drawn by k3d
As mentioned above, a 3D scanner provides us a point cloud. Let’s assume that we have a database of meshes and we want to find a mesh within our database that is aligned with the scanned object, aka point cloud. To address this problem we can suggest a naïve approach. We’ll search for the largest distance between points of the given point cloud and each mesh from our archive. And if such distance will be less for 1e-4
for some mesh, we’ll consider this mesh as aligned with the point cloud.
Finally, we’ve come to the multiprocessing section. Remembering that our archive has plenty of files that might not fit in memory together, as we prefer to process them in parallel. To achieve that we’ll use a multiprocessing Pool
, which handles multiple calls of user-defined function with map
or imap/imap_unordered
methods. The difference between map
and imap
that affects us is that map
converts an iterable to a list before sending it to worker processes. If an archive is too big to be written in the RAM it shouldn’t be unpacked to a Python list. In other words, the execution speed of both is similar.
[Loading meshes: pool.map w/o manager] Pool of 4 processes elapsed time: 37.213207403818764 sec [Loading meshes: pool.imap_unordered w/o manager] Pool of 4 processes elapsed time: 37.219303369522095 sec
Above you see the results of simple reading from the archive of meshes that fit in memory.
Moving further with imap
: Let’s discuss how to accomplish our goal of finding a mesh close to the point cloud. Here is the data. There we have 5 different meshes from Stanford models. We’ll simulate 3D scanning by adding noise to vertices of Stanford bunny mesh.
import numpy as np
from numpy.random import default_rng
def normalize_pc(points):
points = points - points.mean(axis=0)[None, :]
dists = np.linalg.norm(points, axis=1)
scaled_points = points / dists.max()
return scaled_points
def load_bunny_pc(bunny_path):
STD = 1e-3
with open(bunny_path) as f:
bunny_mesh = load_mesh(f)
# normalize point cloud
scaled_bunny = normalize_pc(bunny_mesh.vertices)
# add some noise to point cloud
rng = default_rng()
noise = rng.normal(0.0, STD, scaled_bunny.shape)
distorted_bunny = scaled_bunny + noise
return distorted_bunny
Of course, we previously normalize point cloud and the mesh vertices in the following to scale them in a 3D cube.
To compute distances between a point cloud and the mesh we’ll use igl
. To finalize we need to write a function that will call in each process and its dependencies. Let’s sum up with the following snippet.
import itertools
import time
import numpy as np
from numpy.random import default_rng
import trimesh
import igl
from tqdm import tqdm
from multiprocessing import Pool
def load_mesh(obj_file):
mesh = trimesh.load(obj_file, 'obj')
return mesh
def get_max_dist(base_mesh, point_cloud):
distance_sq, mesh_face_indexes, _ = igl.point_mesh_squared_distance(
point_cloud,
base_mesh.vertices,
base_mesh.faces
)
return distance_sq.max()
def load_mesh_get_distance(args):
obj_file, point_cloud = args[0], args[1]
mesh = load_mesh(obj_file)
mesh.vertices = normalize_pc(mesh.vertices)
max_dist = get_max_dist(mesh, point_cloud)
return max_dist
def read_meshes_get_distances_pool_imap(archive_path, point_cloud, num_proc, num_iterations):
# do the meshes processing within a pool
elapsed_time = []
for _ in range(num_iterations):
archive = MeshesArchive(archive_path)
pool = Pool(num_proc)
start = time.time()
result = list(tqdm(pool.imap(
load_mesh_get_distance,
zip(archive, itertools.repeat(point_cloud)),
), total=len(archive)))
pool.close()
pool.join()
end = time.time()
elapsed_time.append(end - start)
print(f'[Process meshes: pool.imap] Pool of {num_proc} processes elapsed time: {np.array(elapsed_time).mean()} sec')
for name, dist in zip(archive.names_list, result):
print(f"{name} {dist}")
return result
if __name__ == "__main__":
bunny_path = "./data/meshes/stanford-bunny.obj"
archive_path = "./data/meshes.7z"
num_proc = 4
num_iterations = 3
point_cloud = load_bunny_pc(bunny_path)
read_meshes_get_distances_pool_no_manager_imap(archive_path, point_cloud, num_proc, num_iterations)
Here read_meshes_get_distances_pool_imap
is a central function where the following is done:
MeshesArchive
andmultiprocessing.Pool
initializedtqdm
is applied to watch the pool progress and profiling of the whole pool is done manually- output of results performed
Note how we pass arguments to imap
creating a new itearable from archive
and point_cloud
using zip(archive, itertools.repeat(point_cloud))
. That allows us to stick a point cloud array to each entry of the archive avoiding converting archive
to a list.
The result of execution looks like this:
100%|####################################################################| 5/5 [00:00<00:00, 5.14it/s] 100%|####################################################################| 5/5 [00:00<00:00, 5.08it/s] 100%|####################################################################| 5/5 [00:00<00:00, 5.18it/s] [Process meshes: pool.imap w/o manager] Pool of 4 processes elapsed time: 1.0080536206563313 sec armadillo.obj 0.16176825266293382 beast.obj 0.28608649819198073 cow.obj 0.41653845909820164 spot.obj 0.22739556571296735 stanford-bunny.obj 2.3699851136074263e-05
We can eyeball that Stanford bunny is the closest mesh to the given point cloud. It is also seen that we do not use a large amount of data, but we’ve shown that this solution would work even if we have an extensive amount of meshes inside an archive.
Multiprocessing allows data scientists to achieve a great performance not only in 3D computer vision but also in the other fields of machine learning. It is very important to understand that parallel execution is much faster than execution within a loop. The difference becomes significant, especially when an algorithm is written correctly. Large amounts of data reveal problems that won’t be addressed without creative approaches on how to use limited resources. And fortunately, Python language and its extensive set of libraries help us data scientists solve such problems.
Published at DZone with permission of Emil Bogomolov. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments