Pydantic and Elasticsearch: Dynamic Couple for Data Management
Combining Pydantic and Elasticsearch for data management, we can create structured schemas and perform automatic data validation and enhance data consistency.
Join the DZone community and get the full member experience.
Join For FreeElasticsearch is a highly scalable open-source full-text search and analytics engine. It allows you to store, search, and analyze large volumes of data quickly and in near real-time.
An Elasticsearch index is a collection of documents that are related to each other. Each document is a collection of fields, which are the key-value pairs that contain your data. An index is similar to a 'database' in a relational database, and the 'type' is similar to the 'table.'
Mapping is the process of defining how a document and the fields it contains are stored and indexed. For each index, you can define its mapping — a set of rules that explains how to interpret the data that's being inserted into the index. You may think of mapping as the schema definition, which would be equivalent to the table definition in SQL.
Each field in a document can have a different data type, and the mapping tells Elasticsearch what kind of data each field holds. The most common types are:
- Numeric (long, double)
- Boolean
- Keyword
- Text
- Date
- Nested
These types give information on how to store provided documents in the Elastic index. And if you have a complex schema with many different types of fields, where some a nested, and you want to ensure that your data stores in index the way you want — you need something to validate your input data and automate the creation of indexes with a specific schema. That's where Pydantic comes into play!
What Is Pydantic and Why Do You Need It?
Pydantic is a Python library that provides data validation and settings management using Python-type annotations. Its primary feature is that it validates the incoming data, ensuring that the data received match the developer's expectation and reduces the likelihood of runtime errors due to incorrect data. It can be highly beneficial when building APIs or data parsing modules, where data correctness is paramount.
Let's make some assumptions about using Pydantic schemas with Elastic and then try to prove them.
Creation of Indexes Does Not Require You Writing Mapping by Hand but Instead Can Be Described Explicitly by Pydantic Schema
Let's say we're creating a Book document for our Elasticsearch index, as we described earlier. We could define a Pydantic model for that Book document and use it to create the index and validate input data with it.
So, here is our script to create the index:
from datetime import datetime
from typing import List
from pydantic import BaseModel
from elasticsearch import Elasticsearch
# Define index name and its number of relicas and shards at the start of the script
target_index = "books_v1.0"
n_shards = 3
n_replicas = 1
class Title(BaseModel):
english: str
class Abstract(BaseModel):
english: str
french: str
spanish: str
class Book(BaseModel):
id: str
title: List[Title]
author: List[str]
publish_date: datetime
abstract: List[Abstract]
@validator("publish_date)
def validate_date(cls, date: str):
try:
date = date[0:10]
format_dt = "%Y-%m-%d"
date = datetime.strptime(date, format_dt)
return date
except ValueError:
raise ValueError(f"Date Field has wrong format: {date}")
type_map = {str: "keyword", datetime: "date", int: "long", list: "keyword", dict: "nested", List[BaseModel]: "nested"}
def create_es_mapping(pydantic_model: BaseModel):
mapping = {}
for field, field_type in pydantic_model.__annotations__.items():
es_field_type = type_map.get(field_type)
if not es_field_type:
if issubclass(field_type, BaseModel):
es_field_type = create_es_mapping(field_type)
else:
# assuming List[BaseModel] for nested types
es_field_type = {"type": "nested", "properties": create_es_mapping(field_type.__args__[0])}
mapping[field] = es_field_type
return mapping
def main(model):
es = Elasticsearch()
# Generate mapping from Pydantic schema, its consists of 2 main sections: settings and mappings
mapping = {
"settings": {
"number_of_shards": n_shards,
"number_of_replicas": n_replicas
},
"mappings": {
"properties": create_es_mapping(model)
}
}
# Create the Elasticsearch index
if not es.indices.exists(index=target_index):
es.indices.create(index=target_index, body=mapping)
if __name__ == "__main__":
main(model=Book)
This script is designed to create an Elasticsearch index from a Pydantic model. It is setting up the index mapping, validating the data, and creating the index if it doesn't already exist. Here's a step-by-step description of what we do above:
- Imports necessary libraries: This includes datetime for handling date data, List from typing for type hinting, BaseModel from Pydantic for model definition, and Elasticsearch for interacting with an Elasticsearch cluster.
- Defines Constants:
target_index
is the name of the index to be created,n_shards
is the number of primary shards the index should have, andn_replicas
is the number of replicas each primary shard should have. - Defines Pydantic models: These are
Title
,Abstract
, andBook
. Each model corresponds to a part of a book document. These models will be used to validate data and create the Elasticsearch index mapping. - Data validation: Inside the
Book
class, there is avalidator
for thepublish_date
field, it ensures that the date is in the correct format, and if it isn't, it raises a ValueError. - Creates Elasticsearch field type mapping:
type_map
is a dictionary that maps Pydantic field types to Elasticsearch field types. It is used in thecreate_es_mapping
function. Type mapping is really important. Let's dive into it a little deeper. Here it is:{str: "keyword", datetime: "date", int: "long", list: "keyword", dict: "nested", List[BaseModel]: "nested"}
In thistype_map
we map Python types to elastic inner data types, and our scheme will generate mapping based on this type of map, but when we will validate input data, it will use Python types to compare. It's very convenient for developers and easy to understand for review. - Generates Elasticsearch mapping: The
create_es_mapping
function generates an Elasticsearch mapping from a Pydantic model. It uses thetype_map
to determine the Elasticsearch field type for each Pydantic field. - Defines Main Function: The
main
function does several things:
- It creates an Elasticsearch client instance.
- It generates the Elasticsearch mapping from the provided Pydantic model and creates a
mapping
body that includes settings for the number of shards and replicas. - It checks if the target index exists in Elasticsearch. If it doesn't, it creates the index using the
indices.create
method of the Elasticsearch client.
8. Executes main function: The script ends by running the main
function if the script is being run as the main program. It passes the Book
model as an argument to the function.
So, what we just did? Out script automated the process of creating an Elasticsearch index from a Pydantic model, which can be very helpful in ensuring data consistency and reducing manual work. And now, you can keep track on GitHub how your index schema evolves over time, and it's easy to reproduce it whenever and wherever you want. It is just what we wanted to achieve!
Data Validation With Pydantic Schema Is Easy To Conduct and Guarantees Data Consistency
We wrote schema for the index books_v1.0
with the Python script above, let's call it books_schema.py
, and created this index. Now, we upload some data into the index and validate data with this schema before it is uploaded. So, here is Python script to do this task:
from books_schema import Book, target_index
from pydantic import BaseModel
def to_upload(documents: list, index: str, model: BaseModel):
for document in documents:
prepared_doc = model.parse_obj(document)
es.index(index=target_index, id=document["id"], body=prepared_doc)
if __name__ == "__main__":
# Initialize Elasticsearch client
es = Elasticsearch()
# Example dataset
books_data = [
{
"title": [{"english": "To Kill a Mockingbird"}],
"author": ["Harper Lee"],
"publish_date": "1960-07-11",
"abstract": [
{
"english": "The unforgettable novel of a childhood in a sleepy Southern town and the crisis of conscience that rocked it.",
"french": "Le roman inoubliable d'une enfance dans une petite ville du Sud endormie et la crise de conscience qui l'a ébranlée.",
"spanish": "La inolvidable novela de una infancia en una tranquila ciudad sureña y la crisis de conciencia que la sacudió."
}
]
},
# more book data here...
]
to_upload(documents=books_data, index=target_index, model=Book)
This script uploads a list of documents (in this case, books) to an Elasticsearch index. It also validates these documents against a Pydantic model before they are uploaded.
Here's a detailed breakdown of what's happening:
- Imports: The script starts by importing
Book
schema andtarget_index
frombooks_schema.py
. It also importsBaseModel
frompydantic
. - Function definition: The
to_upload
function is defined. This function takes three arguments:documents
(a list of documents to be uploaded),index
(the name of the Elasticsearch index to which the documents will be uploaded), andmodel
(a Pydantic model against which the documents will be validated). For each document indocuments
, the function validates the document againstmodel
and then indexes the document in Elasticsearch. - Main script: The main part of the script is executed when the script is run directly (not imported as a module).
- It first creates an instance of the Elasticsearch client.
- It then creates a list of book data (
books_data
). Each item in this list is a dictionary representing a book. - Finally, it calls the
to_upload
function, passingbooks_data
,target_index
, andBook
as the arguments. This will validate and upload each book inbooks_data
to the Elasticsearch index specified bytarget_index
. Ifpublish_date
field does not pass validation, or some data types will not correspond to those in Book class, an error will be raised preventing data to upload into the index.
We validate the input data against a pre-defined schema using Python's type annotations. This ensures that the data received for Elasticsearch indexing matches what's expected, reducing the likelihood of invalid data being indexed and causing errors later. If there's a mismatch, Pydantic will raise an error, enabling you to catch issues before they become more significant problems.
What Else Can I Do With Pydantic?
Now we know that the creation of indexes can be automated and how easy to validate data before uploading it, let's see what else we can do to reduce the likelihood of runtime errors due to incorrect data
Pydantic can make different types of validation. Here are some:
Data Conversion
from pydantic import BaseModel, validator
class Product(BaseModel):
name: str
price: float
@validator("price", pre=True)
def convert_price(cls, price):
try:
return float(price)
except ValueError:
raise ValueError(f"Could not convert price to float: {price}")
The @validator
decorator marks the convert_price
function as a validator for the price
field. The pre=True
argument means that this function will be run before any other validation. Inside the convert_price
function, we attempt to convert the provided price to a float, and if this conversion is not possible (for example, if the price is a string that cannot be converted to a float), a ValueError
will be raised.
Validating Numbers
from pydantic import BaseModel, conint
class Model(BaseModel):
age: conint(gt=0, lt=150) # age should be greater than 0 and less than 150
model = Model(age=25) # This is valid
model = Model(age=150) # This will raise a validation error
You can use the conint
(constrained integer) and confloat
(constrained float) types provided by Pydantic to validate numeric fields.
Validating Choices
from pydantic import BaseModel
from enum import Enum
class FruitEnum(str, Enum):
apple = "orange"
banana = "banana"
cherry = "cherry"
class Model(BaseModel):
fruit: FruitEnum
model = Model(fruit="orange") # This is valid
model = Model(fruit="mango") # This will raise a validation error
Custom Validation (Most Used)
from pydantic import BaseModel, validator
class Model(BaseModel):
name: str
@validator('name')
def name_must_contain_space(cls, v):
if ' ' not in v:
raise ValueError('must contain a space')
return v.title() # this will convert the name to title case
model = Model(name="John Hopkins") # This is valid
model = Model(name="JackDaniels") # This will raise a validation error
Here, we check if there is a space in the name field. If there is no space, an error arises, and if space is found, the name is returned in the title case.
Methods We Used
To illustrate the power of combining Pydantic and Elasticsearch, we followed a practical approach of designing a data validation and index generation pipeline. The entire process was divided into several stages to demonstrate a clear and reproducible method.
- Understanding the Basics: We first provided a concise explanation of Elasticsearch and Pydantic, describing their primary functions and why they have widely used tools in the field of data management.
- Defining Elasticsearch Mappings: An explanation was provided about Elasticsearch mapping, which is essentially the process of defining how a document should be mapped to the search engine, including its searchable characteristics. For example, defining fields as date, integer, string, or using more complex types like nested and object fields.
- Creating Pydantic Models: After understanding the Elasticsearch mapping, we demonstrated how to translate it into a Pydantic model. For instance, the Book model, composed of nested Title and Abstract models, was used to represent the structure of book data. This model also includes data validation techniques like checking the format of the
publish_date
field. - Generating Elasticsearch Index Mapping from the Pydantic Model: An automated Python script was presented which generates the Elasticsearch index mapping directly from the Pydantic model. This is a crucial step that enhances the accuracy of data mapping and reduces the chances of human error.
- Creating Elasticsearch Index: We utilized the Elasticsearch client and the mapping generated in the previous step to create an Elasticsearch index. The created index replicates the structure defined by our Pydantic model.
- Data Validation and Indexing: Once the index was created, we demonstrated how to validate and index data using our Pydantic model. We made use of Pydantic's
parse_obj
method, which not only validates the data but also converts it into a model instance. Any errors encountered during validation are raised immediately, thus ensuring data quality and integrity.
This method serves as a guideline for using Pydantic and Elasticsearch together. It streamlines the process of data validation and indexing while minimizing the risk of data discrepancies and errors. This combination is particularly useful in data-intensive applications where the quality and structure of data play a crucial role.
Conclusion
Pydantic and Elasticsearch couple presents a powerful solution for data management, and by leveraging the functionality of Pydantic, we can implement automatic validation, serialization, and documentation for complex data schemas, ensuring our data consistency and structure.
Elasticsearch, on the other hand, is a potent, flexible, and scalable search and analytics engine that can handle a considerable volume of data swiftly and efficiently; however, creating an index with the correct mappings can be a complicated process, especially for nested and complex structures.
The method illustrated in this article demonstrates how Pydantic can be used to simplify this process. By defining a Pydantic model that mirrors our desired Elasticsearch index structure, we can automate the generation of Elasticsearch index mappings, thereby reducing human error and enhancing the accuracy of data mapping. Furthermore, by utilizing Pydantic's validation capabilities, we ensure the quality and integrity of the data that we index into Elasticsearch.
Though our focus has been on a specific application involving Book
data, the method is widely applicable to any scenario where you are working with structured data and Elasticsearch. With this approach, the benefits of data consistency, efficient search, and automated validation are realized, reinforcing the value of integrating these two powerful tools.
Opinions expressed by DZone contributors are their own.
Comments