Parquet Data Filtering With Pandas
Exploring Data Filtering Techniques when Using Pandas to Read Parquet Files. How they work, and which of the ways gives the best results.
Join the DZone community and get the full member experience.
Join For FreeWhen it comes to filtering data from Parquet files using pandas, several strategies can be employed. While it’s widely recognized that partitioning data can significantly enhance the efficiency of filtering operations, there are additional methods to optimize the performance of querying data stored in Parquet files. Partitioning is just one of the options.
Filtering by Partitioned Fields
As previously mentioned, this approach is not only the most familiar but also typically the most impactful in terms of performance optimization. The rationale behind this is straightforward. When partitions are employed, it becomes possible to selectively exclude the need to read entire files or even entire directories of files (aka, predicate pushdown), resulting in a substantial and dramatic improvement in performance.
import pandas as pd
import time
from faker import Faker
fake = Faker()
MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"
print(f"Creating fake data")
data = {
'id': range(NUM_OF_RECORDS), # Generate IDs from 1 to 100
'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}
df = pd.DataFrame(data)
# writing without partitions
df.to_parquet(path=NON_PARTITIONED_PATH)
# writing partitioned data
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])
# reading non partitioned
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time # 37 sec
# reading partitioned data
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec
The time improvement (along with reduced memory and CPU usage) is substantial, decreasing from 37 seconds to just 0.20 seconds.
Filtering by Non-Partitioned Fields
In the example above, we observed how filtering based on a partitioned field can enhance data retrieval. However, there are scenarios where data can’t be effectively partitioned by the specific field we wish to filter. Moreover, in some cases, filtering is required based on multiple fields. This means all input files will be opened, which can be harmful to performance.
Thankfully, Parquet offers a clever solution to mitigate this issue. Parquet files are split into row groups. Within each row group, Parquet stores metadata. This metadata includes the minimum and maximum values for each field.
When writing Parquet files with Pandas, you can select what will be the number of records in each control group.
When using Pandas to read Parquet files with filters, the Pandas library leverages this Parquet metadata to efficiently filter data loaded into memory. If the desired field falls outside the min/max range of a row group, that entire row group is gracefully skipped.
df = pd.DataFrame(data)
# writing non partitioned data, specifying the size of the row group
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)
# reading non partitioned data and filtering by row groups only
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
Viewing the metadata inside Parquet files can be done using PyArrow
.
>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 6
num_rows: 1000000
num_row_groups: 10
format_version: 2.6
serialized_size: 9325
>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
file_offset: 1675616
file_path:
physical_type: BYTE_ARRAY
num_values: 100000
path_in_schema: state
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x115283590>
has_min_max: True
min: Alabama
max: Wyoming
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY
encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
has_dictionary_page: True
dictionary_page_offset: 1599792
data_page_offset: 1600354
total_compressed_size: 75824
total_uncompressed_size: 75891
Notice that the number of row groups is mentioned in the metadata of the entire file, and the minimum and maximum values are mentioned inside the statistics section of each column for each row group.
However, there is a method to further harness this Parquet feature for even more optimized results: sorting.
Filtering by Sorted Fields
As mentioned in the previous section, part of the metadata stored by Parquet includes the minimum and maximum values for each field within every row group. When the data is sorted based on the field we intend to filter by, Pandas has a greater likelihood of skipping more row groups.
For example, let’s consider a dataset that includes a list of records, with one of the fields representing ‘state.’ If the records are unsorted, there’s a good chance that each state appears in most of the row groups. For example, look at the metadata in the previous section. You can see that the 1st-row group alone holds all the states from ‘Alabama’ to ‘Wyoming.’
However, if we sort the data based on the ‘state’ field, there’s a significant probability of skipping many row groups.
df = pd.DataFrame(data)
# sorting the data based on 'state'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)
Now, let’s look again at the metadata and see how it changed.
>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)
>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'
As you can see, after sorting by state, the min-max values are affected accordingly; each row of groups holds part of the states instead of all of the states. This means reading with filters should be a lot quicker now.
Now, let’s see how it affects the performance of reading the data. The code for reading the data hasn’t changed.
# reading non partitioned data and filtering by row groups, the input is sorted by state
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds
Astonishingly, the performance here is almost as good as using partitions.
This principle applies to both partitioned and non-partitioned data. We can use both methods at the same time. If we sometimes want to filter the data based on field A and other times based on field B, then partitioning by field A and sorting by field B could be a good option.
In other cases, for instance, where the field we want to filter by is a field with high cardinality, we could partition by some hash of the value (bucketing) and sort the data inside it by the actual value of the field in this way we will enjoy the advantages of both methods — partitioning and row groups.
Reading a Subset of the Columns
Although less commonly used, another method for achieving better results during data retrieval involves selecting only the specific fields that are essential for your task. This strategy can occasionally yield improvements in performance. This is due to the nature of the Parquet format. Parquet is implemented in a columnar format, which means it stores the data column by column inside each row group. Reading only some of the columns means the other columns will be skipped.
start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds
Unsurprisingly, the improvement in performance is great.
Conclusion
While partitioning data is typically the optimal approach, it is not always a possibility. Sorting the data can lead to significant improvements. We may skip more row groups this. Additionally, if feasible, selecting only the necessary columns is always a good choice.
This post helped you understand how to harness the power of parquet and pandas for better performance.
Here is a script containing all the previously mentioned examples, complete with time comparisons.
Published at DZone with permission of Avi Yehuda, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments