Best Practices for Market Data Replay
Based on DolphinDB’s streaming replay, distributed database architecture, and APIs, you can create a tool for historical insight and model back-testing.
Join the DZone community and get the full member experience.
Join For FreeThis tutorial introduces the practical implementation of the DolphinDB’s replay feature. Based on DolphinDB’s streaming replay, distributed database architecture, and APIs, you can create a powerful tool for historical insight and model back-testing, which allows you to review situations of interest and improve future performance. By following the instructions in this tutorial, you will learn:
- The workflow of constructing a market replay solution
- The optimal data storage plan for data sources
- The methods to process and analyze replayed data via APIs
- The performance of the replay functionality
Note: For detailed instructions on how to replay market data, refer to Tutorial: Market Data Replay.
1. Construct a Market Data Replay Solution
This chapter guides you through the process of building a comprehensive market data replay solution. Our implementation focuses on three types of market data: quote tick data, trade tick data, and snapshots of Level-2 data. The service offers the following capabilities:
- Multi-client support: Allows submission of replay requests via various clients (including C++ and Python API)
- Concurrent user requests: Supports simultaneous replay requests from multiple users
- Multi-source synchronization: Enables orderly replay of multiple data sources concurrently
- Advanced ordering options: Provides time-based ordering and further sequential ordering based on timestamps (e.g., based on record numbers of tick data)
- Completion signal: Signals the end of a replay
- Flexible data consumption: Offers subscription-based access to process replayed results
1.1 Architecture
The solution we build has the following architecture:
- Market data ingestion: Both real-time and historical tick data can be imported to the DolphinDB distributed database via the DolphinDB API or a plugin.
- Function encapsulation: Functions used during the replay process can be encapsulated as function views. Key parameters such as stock list, replay date, replay rate, and data source can still be specified by users.
- User request: Users can replay market data by calling the function view through DolphinDB clients (such as DolphinDB GUI, VS Code Extension, and APIs). Users can also subscribe to and consume real-time replay results on the client side. Additionally, multi-user concurrent replay is supported.
1.2 Steps
This section introduces the steps for building a market data replay solution, as shown in the following figure:
- Step 1 (Chapter 2): Design and implement a suitable distributed database structure in DolphinDB. Create the necessary tables and import historical market data to serve as the replay data source.
- Step 2 (Chapter 3): Develop a DolphinDB function view that encapsulates the entire replay process. This abstraction allows users to initiate replay requests by simply specifying key parameters, without needing to understand the details of DolphinDB’s replay functionality.
- Step 3 (Chapters 4 and 5): Utilize the DolphinDB API to call the previously created function view from external applications. This enables the execution of replay operations outside the DolphinDB environment. Additionally, users can set up subscriptions to consume real-time replay results on the client side.
2. Storage Plan for Market Data
The example in this tutorial replays three types of market data: quote tick data, trade tick data, and snapshots of Level-2 data.
The following chart displays the data (which is stored in TSDB databases) we use in the example:
- Note: You can download the sample data in appendices and execute scripts to import corresponding data (order, transaction, snapshot).
The replay mechanism operates on a fundamental principle: it retrieves the necessary tick data from the database, sorts it chronologically, and then writes it to the stream table. This process is heavily influenced by the efficiency of database reading and sorting operations, which directly impacts the overall replay speed.
To optimize performance, a well-designed partitioning scheme is crucial. The partitioning scheme of tables outlined above is tailored to common replay requests, which are typically submitted based on specific dates and stocks.
In addition, here, we utilize the DolphinDB cluster for storing historical data. The cluster consists of three data nodes and maintains two chunk replicas. The distributed nature of the cluster allows for parallel data access across multiple nodes, significantly boosting read speeds. By maintaining multiple copies of the data, the system achieves higher resilience against potential node failures.
3. User-Defined Functions for Replay
This chapter delves into the core functionalities of the replay process and their implementation details. These functions are then encapsulated into function views, which can be called through APIs.
The chart below outlines the functions in the example.
- Note: We used the DDB GUI for development. The complete script is available in the appendices: Functions for Market Data Replay.
3.1 stkReplay: Replay
The stkReplay
function is used to perform the replay operation.
The function definition is as follows:
def stkReplay(stkList, mutable startDate, mutable endDate, replayRate, replayUuid, replayName)
{
maxCnt = 50
returnBody = dict(STRING, STRING)
startDate = datetimeParse(startDate, "yyyyMMdd")
endDate = datetimeParse(endDate, "yyyyMMdd") + 1
sortColumn = "ApplSeqNum"
if(stkList.size() > maxCnt)
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "Exceeds the limits for a single replay. The maximum is: " + string(maxCnt)
return returnBody
}
if(size(replayName) != 0)
{
for(name in replayName)
{
if(not name in ["snapshot", "order", "transaction"])
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "Input the correct name of the data source. Cannot recognize: " + name
return returnBody
}
}
}
else
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "Missing data source. Input the correct name of the data source."
return returnBody
}
try
{
if(size(replayName) == 1 && replayName[0] == "snapshot")
{
colName = ["timestamp", "biz_type", "biz_data"]
colType = [TIMESTAMP, SYMBOL, BLOB]
sortColumn = "NULL"
}
else
{
colName = ["timestamp", "biz_type", "biz_data", sortColumn]
colType = [TIMESTAMP, SYMBOL, BLOB, LONG]
}
msgTmp = streamTable(10000000:0, colName, colType)
tabName = "replay_" + replayUuid
enableTableShareAndPersistence(table=msgTmp, tableName=tabName, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=60, flushMode=0, preCache=1000000)
timeRS = cutPoints(09:30:00.000..15:00:00.000, 23)
inputDict = dict(replayName, each(dsTb{timeRS, startDate, endDate, stkList}, replayName))
dateDict = dict(replayName, take(`MDDate, replayName.size()))
timeDict = dict(replayName, take(`MDTime, replayName.size()))
jobId = "replay_" + replayUuid
jobDesc = "replay stock data"
submitJob(jobId, jobDesc, replayJob{inputDict, tabName, dateDict, timeDict, replayRate, sortColumn})
returnBody["errorCode"] = "1"
returnBody["errorMsg"] = "Replay successfully"
return returnBody
}
catch(ex)
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "Exception occurred when replaying: " + ex
return returnBody
}
}
Note: The function definition is flexible and can be customized to suit specific requirements.
The parameters passed in will first be validated and formatted:
- The
stkList
parameter is limited to a maximum of 50 stocks (defined bymaxCnt
) for a single replay operation. - Parameters
startDate
andendDate
are formatted bystartDate
andendDate
using thedatetimeParse
function. replayName
is the list of data sources to be replayed. It must be one of “snapshot”, “order”, or “transaction”, in this example.
If any parameters fail validation, the function reports errors (as defined in returnBody
). Upon successful parameter validation, the function proceeds to initialize the output table.
The output table “msgTmp” is defined as a heterogeneous stream table where the BLOB column stores the serialized result of each replayed record. To optimize memory usage, it is shared and persisted with the enableTableShareAndPersistence
function.
When the data source includes both “transaction” and “order”, an extra sort column can be specified for records with the same timestamp. In this example, records in “transaction” and “order” with the same timestamp are sorted by the record number “ApplSeqNum” (See section 3.3 replayJob function for details). In such a case, the output table must contain the sort column. If the data source contains only “snapshots”, the sort column is not required.
Then, submit the replay job using the submitJob
function. You will be informed about the status of the replay job, whether it has executed successfully or encountered an exception (as defined in returnBody
).
3.2 dsTb: Construct Data Sources
The dsTb
function is used to prepare the data sources to be replayed.
The function definition is as follows:
def dsTb(timeRS, startDate, endDate, stkList, replayName)
{
if(replayName == "snapshot"){
tab = loadTable("dfs://Test_snapshot", "snapshot")
}
else if(replayName == "order") {
tab = loadTable("dfs://Test_order", "order")
}
else if(replayName == "transaction") {
tab = loadTable("dfs://Test_transaction", "transaction")
}
else {
return NULL
}
ds = replayDS(sqlObj=<select * from tab where MDDate>=startDate and MDDate<endDate and HTSCSecurityID in stkList>, dateColumn='MDDate', timeColumn='MDTime', timeRepartitionSchema=timeRS)
return ds
}
The dsTb
function serves as a higher-level wrapper around the built-in replayDS
function.
- It first evaluates the provided
replayName
to determine which dataset to use. - Based on
replayName
, it loads the corresponding table object from the database. - Using
replayDS
, it segments the data withstkList
in the range of[startDate, endDate)
into multiple data sources based ontimeRS
, and returns a list of data sources.
The timeRS
parameter, which corresponds to the timeRepartitionSchema
parameter in the replayDS
function, is a vector of temporal types to create finer-grained data sources, ensuring efficient querying of the DFS table and optimal management memory usage.
The timeRS
in this example is defined as a variable in the stkReplay
function, timeRS = cutPoints(09:30:00.000..15:00:00.000, 23)
, which means dividing data into trading hours (09:30:00.000..15:00:00.000)
into 23 equal parts.
To better understand how dsTb
works, execute the following script. For the "order" data source, the records with Security ID "000616.SZ" in trading hours of 2021.12.01 are divided into 3 parts.
timeRS = cutPoints(09:30:00.000..15:00:00.000, 3)
startDate = 2021.12.01
endDate = 2021.12.02
stkList = ['000616.SZ']
replayName = ["order"]
ds = dsTb(timeRS, startDate, endDate, stkList, replayName)
This returns a list of metacode objects, each containing an SQL statement representing these data segments.
DataSource< select [4] * from tab where time(MDTime) < 09:30:00.000,nanotime(MDTime) >= 00:00:00.000000000,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 11:20:00.001,time(MDTime) >= 09:30:00.000,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 13:10:00.001,time(MDTime) >= 11:20:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 15:00:00.001,time(MDTime) >= 13:10:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where nanotime(MDTime) <= 23:59:59.999999999,time(MDTime) >= 15:00:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
3.3 replayJob: Define Replay Job
The replayJob
function is used to define the replay job.
The function definition is as follows:
def replayJob(inputDict, tabName, dateDict, timeDict, replayRate, sortColumn)
{
if(sortColumn == "NULL")
{
replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23)
}
else
{
replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23, sortColumns=sortColumn)
}
createEnd(tabName, sortColumn)
}
The replayJob
function encapsulates the built-in replay
function to perform an N-to-1 heterogeneous replay. Once all required data has been replayed, the createEnd
function is called to construct and write an end signal into the replay result, marking the conclusion of the replay process with a special record.
The inputDict
, dateDict
, and timeDict
are defined as variables in the stkReplay
function:
inputDict
is a dictionary indicating data sources to be replayed. The higher-order functioneach
is used to define the data source for eachreplayName
.dateDict
andtimeDict
are dictionaries indicating the date and time columns of the data sources, used for data sorting.
The replayJob
function includes a sortColumn
parameter for additional data ordering. If the data source consists only of "snapshots” (where ordering is not a concern), sortColumn
should be set to NULL
. In this case, the built-in replay
function is called without its sortColumns
parameter. For other data sources ("order" and "transaction"), sortColumn
can be used to specify an extra column for sorting the data with the same timestamp.
3.4 createEnd: Signal the End
The createEnd
function is used to define the end signal. It can be optionally defined.
The function definition is as follows:
def createEnd(tabName, sortColumn)
{
dbName = "dfs://End"
tbName = "endline"
if(not existsDatabase(dbName))
{
db = database(directory=dbName, partitionType=VALUE, partitionScheme=2023.04.03..2023.04.04)
endTb = table(2200.01.01T23:59:59.000 as DateTime, `END as point, long(0) as ApplSeqNum)
endLine = db.createPartitionedTable(table=endTb, tableName=tbName, partitionColumns=`DateTime)
endLine.append!(endTb)
}
ds = replayDS(sqlObj=<select * from loadTable(dbName, tbName)>, dateColumn='DateTime', timeColumn='DateTime')
inputEnd = dict(["end"], [ds])
dateEnd = dict(["end"], [`DateTime])
timeEnd = dict(["end"], [`DateTime])
if(sortColumn == "NULL")
{
replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1)
}
else
{
replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1, sortColumns=sortColumn)
}
}
The createEnd
function signals the end of the replay process, writing a record labeled "end” to the output table (specified by tabName
). inputEnd
, dateEnd
, and timeEnd
are dictionaries with the string "end” as the key, which corresponds to the second column (biz_type
) in the output table.
To streamline the parsing and consumption of the output table, a separate database is created specifically for the end signal. Within this database, a partitioned table is established. This table must include a time column, with other fields being optional. A simulated record is written to this table. This record is not strictly defined and can be customized as needed. For instructions on how to create a database and table, refer to DolphinDB Tutorial: Distributed Database.
The createEnd
function also includes a sortColumn
parameter to determine whether to call the built-in replay
function with or without its sortColumns
parameter. The following figure demonstrates an end signal in the output table when sortColumn
is set to NULL
.
3.5 Encapsulate Functions
The functions described above are encapsulated into function views using the addFunctionView
function, as shown in the following script.
addFunctionView(dsTb)
addFunctionView(createEnd)
addFunctionView(replayJob)
addFunctionView(stkReplay)
4. Replay Data With APIs
By creating function views, we’ve simplified the interface for API clients. Now, we only need to invoke the function stkReplay
in APIs to initiate the replay process. The replay process detailed in this chapter is all built upon the above function view.
4.1 With C++ API
This example runs on a Linux OS, with a DolphinDB C++ API environment set up. With executable files compiled, you can run the code from the command line interface. The full script is available in the Appendices: C++ code.
The C++ application interacts with the DolphinDB server using a DBConnection object (conn
). Through DBConnection
, you can execute scripts and functions on the DolphinDB server and transfer data in both directions.
The application invokes the stkReplay
function on the DolphinDB server using the DBConnection::run
function. The args
holds all the necessary arguments for the stkReplay
function. The function's result is captured in a dictionary named 'result'. The application checks the "errorCode” key of the result using the get
method. If the errorCode
is not 1, it indicates an execution error. In this case, the application returns an error message and terminates the program.
The following code demonstrates how to call the stkReplay
function.
DictionarySP result = conn.run("stkReplay", args);
string errorCode = result->get(Util::createString("errorCode"))->getString();
if (errorCode != "1")
{
std::cout << result->getString() << endl;
return -1;
}
When multiple users initiate replay operations, to prevent conflicts that could arise from duplicate table names, it’s crucial to assign unique names to the stream tables being replayed. Executing the following code can generate a unique identifier for each user. This identifier takes the form of a string, such as “Eq8Jk8Dd0Tw5Ej8D”.
string uuid(int len)
{
char* str = (char*)malloc(len + 1);
srand(getpid());
for (int i = 0; i < len; ++i)
{
switch (i % 3)
{
case 0:
str[i] = 'A' + std::rand() % 26;
break;
case 1:
str[i] = 'a' + std::rand() % 26;
break;
default:
str[i] = '0' + std::rand() % 10;
break;
}
}
str[len] = '\0';
std::string rst = str;
free(str);
return rst;
}
4.2 With Python API
This example runs on a Windows OS, with a DolphinDB Python API environment set up. The full script is available in the Appendices: Python Code.
The Python application interacts with the DolphinDB server using a session object (s
). Through sessions, you can execute scripts and functions on the DolphinDB server and transfer data in both directions.
Once the necessary variables (stk_list
, start_date
, end_date
, replay_rate
, replay_uuid
, and replay_name
) have been defined, the upload
method is used to upload these objects to the DolphinDB server and the run
method is invoked to execute the stkReplay
function.
The following code demonstrates how to call the stkReplay
function.
stk_list = ['000616.SZ','000681.SZ']
start_date = '20211201'
end_date = '20211201'
replay_rate = -1
replay_name = ['snapshot']
s.upload({'stk_list':stk_list, 'start_date':start_date, 'end_date':end_date, 'replay_rate':replay_rate, 'replay_uuid':uuidStr, 'replay_name':replay_name})
s.run("stkReplay(stk_list, start_date, end_date, replay_rate, replay_uuid, replay_name)")
When multiple users initiate replay operations, to prevent conflicts that could arise from duplicate table names, execute the following code can generate a unique identifier for each user. This identifier takes the form of a string, such as “Eq8Jk8Dd0Tw5Ej8D”.
def uuid(length):
str=""
for i in range(length):
if(i % 3 == 0):
str += chr(ord('A') + random.randint(0, os.getpid() + 1) % 26)
elif(i % 3 == 1):
str += chr(ord('a') + random.randint(0, os.getpid() + 1) % 26)
else:
str += chr(ord('0') + random.randint(0, os.getpid() + 1) % 10)
return str
uuidStr = uuid(16)
5. Consume Replayed Data With APIs
This chapter introduces how to consume replayed data with C++ API and Python API.
5.1 With C++ API
5.1.1 Construct a Deserializer
To deserialize the replayed data (which is stored in a heterogeneous table), a deserializer has to be constructed first. Use the following code to construct a deserializer:
DictionarySP snap_full_schema = conn.run("loadTable(\"dfs://Test_snapshot\", \"snapshot\").schema()");
DictionarySP order_full_schema = conn.run("loadTable(\"dfs://Test_order\", \"order\").schema()");
DictionarySP transac_full_schema = conn.run("loadTable(\"dfs://Test_transaction\", \"transaction\").schema()");
DictionarySP end_full_schema = conn.run("loadTable(\"dfs://End\", \"endline\").schema()");
unordered_map<string, DictionarySP> sym2schema;
sym2schema["snapshot"] = snap_full_schema;
sym2schema["order"] = order_full_schema;
sym2schema["transaction"] = transac_full_schema;
sym2schema["end"] = end_full_schema;
StreamDeserializerSP sdsp = new StreamDeserializer(sym2schema);
The initial four lines get table schemata of “snapshot,” “order,” “transaction,” and “end signal”. During replay, the appropriate schema is chosen as needed, with the end being required.
To handle heterogeneous stream tables, a StreamDeserializer
object is created using the sym2schema
method. This approach maps symbols (keys for source tables) to their respective table schemata. Subsequently, both the data sources and the end signal are deserialized based on these schemata.
For detailed instructions, see C++ API Reference Guide: Constructing a Deserializer.
5.1.2 Subscribe To Replayed Data
Execute the following code to subscribe to the output table:
int listenport = 10260;
ThreadedClient threadedClient(listenport);
string tableNameUuid = "replay_" + uuidStr;
auto thread = threadedClient.subscribe(hostName, port, myHandler, tableNameUuid, "stkReplay", 0, true, nullptr, true, 500000, 0.001, false, "admin", "123456", sdsp);
std::cout << "Successfully subscribe to " + tableNameUuid << endl;
thread->join();
Where variables:
listenport
is the subscription port of the single-threaded client.tableNameUuid
is the name of the stream table to be consumed.
We subscribe to the stream table through threadedClient.subscribe
. The thread
returned points to the thread that continuously invokes myHandler
. It will stop calling myHandler
when function unsubscribe
is called on the same topic.
For detailed instructions, see the C++ API Reference Guide: Subscription.
5.1.3 Define the Handler for Processing Data
The threadedClient.subscribe
method is used to subscribe to the heterogeneous stream table. During this process, the StreamDeserializerSP
instance deserializes incoming data and routes it to a user-defined function handler
. Users can customize data processing logic by implementing their own myHandler
function.
In this particular example, a basic output operation is performed. When the msg
is flagged as “end”, the subscription is terminated using threadedClient.unsubscribe
. The implementation of myHandler
for this example is as follows:
long sumcount = 0;
long long starttime = Util::getNanoEpochTime();
auto myHandler = [&](vector<Message> msgs)
{
for (auto& msg : msgs)
{
std::cout << msg.getSymbol() << " : " << msg->getString() << endl;
if(msg.getSymbol() == "end")
{
threadedClient.unsubscribe(hostName, port, tableNameUuid,"stkReplay");
}
sumcount += msg->get(0)->size();
}
long long speed = (Util::getNanoEpochTime() - starttime) / sumcount;
std::cout << "callback speed: " << speed << "ns" << endl;
};
5.1.4 Execute the Application
After compiling the main file with the aforementioned code within the DolphinDB C++ API environment, you can initiate the “replay -subscription — consumption” process using specific commands.
- To replay data of a single stock from one table (“order”) for one day at maximum speed:
$ ./main 000616.SZ 20211201 20211201 -1 order
- To replay data of two (or more) stocks from three tables (“snapshot”, “order”, “transaction”) for one day at maximum speed:
$ ./main 000616.SZ,000681.SZ 20211201 20211201 -1 snapshot,order,transaction
5.1.5 Output the Result
The following figure displays the result for replaying two stocks (“000616.SZ” & “000681.SZ”) from three tables at maximum speed:
5.2 With Python API
5.2.1 Construct a Deserializer
To deserialize the replayed data (which is stored in a heterogeneous table), a deserializer has to be constructed first. Use the following code to construct a deserializer:
sd = ddb.streamDeserializer({
'snapshot': ["dfs://Test_snapshot", "snapshot"],
'order': ["dfs://Test_order", "order"],
'transaction': ["dfs://Test_transaction", "transaction"],
'end': ["dfs://End", "endline"],
}, s)
To handle heterogeneous stream tables, a streamDeserializer
object is created. The parameter sym2table
is defined as a dictionary object. The keys are the source tables (i.e., "snapshot," "order," "transaction," and "end signal") and the values are the schema of each table. Note that during replay, the appropriate schema is chosen as needed, with the end signal schema being required.
For detailed instructions, see Python API Reference Guide: streamDeserializer.
5.2.2 Subscribe To Replayed Data
Execute the following code to subscribe to the output table:
s.enableStreaming(0)
s.subscribe(host=hostname, port=portname, handler=myHandler, tableName="replay_"+uuidStr, actionName="replay_stock_data", offset=0, resub=False, msgAsTable=False, streamDeserializer=sd, userName="admin", password="123456")
event.wait()
The enableStreaming
method is used to enable streaming data subscription. Then call s.subscribe
to create the subscription. Call event.wait()
to block the current thread to keep receiving data in the background.
For detailed instructions, see the Python API Reference Guide: Subscription.
5.2.3 Define the Handler for Processing Data
The s.subscribe method
is used to subscribe to the heterogeneous stream table. During this process, the streamDeserializer
instance deserializes incoming data and routes it to a user-defined function handler
. Users can customize data processing logic by implementing their own myHandler
function.
In this particular example, a basic output operation is performed. The implementation of myHandler
for this example is as follows:
def myHandler(lst):
if lst[-1] == "snapshot":
print("SNAPSHOT: ", lst)
elif lst[-1] == 'order':
print("ORDER: ", lst)
elif lst[-1] == 'transaction':
print("TRANSACTION: ", lst)
else:
print("END: ", lst)
event.set()
5.2.4 Output the Result
The following figure displays the result for replaying two stocks (“000616.SZ” & “000681.SZ”) from three tables at maximum speed:
6. Performance Testing
We selected 50 stocks ranging from 2021.12.01 to 2021.12.09 for our performance testing. The test scripts can be found in the Appendices: C++ Test Code.
6.1 Test Environment
- Processor family: Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
- CPU(s): 64
- Memory: 503 GB
- Disk: SSD
- OS: CentOS Linux release 7.9.2009 (Core)
- DolphinDB Server: version 2.00.9.3 (released on 2023.03.29)
6.2 Concurrent Replays of 50 Stocks on One Trading Day
We selected 50 stocks traded on 2021.12.01. Multiple instances of the C++ API replay program (described in Chapter 4) were simultaneously initiated in the Linux background to submit concurrent replay tasks.
- Elapsed time: The latest finish time — the earliest receive time across all replay tasks
- Replay rate: The sum of all users’ replay data volume/the total elapsed time
- Average single-user elapsed time: (The sum of start times — the sum of end times)/the number of concurrent tasks
- Average single-user replay rate: The amount of data replayed by a single user/the average single-user elapsed time
6.3 Replays of 50 Stocks Across Multiple Trading Days
We selected 50 stocks ranging from 2021.12.01 to 2021.12.09. The instance of the C++ API replay program (described in Chapter 4) was initiated in the Linux background to submit a task for replaying data across multiple trading days.
- Elapsed time = The latest finish time — the earliest receive time
- Replay rate = Data volume / the total elapsed time
7. Development Environment
7.1 DolphinDB Server
- Server Version: 2.00.9.3 (released on 2023.03.29)
- Deployment: standalone mode (see standalone deployment)
- Configuration: cluster.cfg
maxMemSize=128
maxConnections=5000
workerNum=24
webWorkerNum=2
chunkCacheEngineMemSize=16
newValuePartitionPolicy=add
logLevel=INFO
maxLogSize=512
node1.volumes=/ssd/ssd3/pocTSDB/volumes/node1,/ssd/ssd4/pocTSDB/volumes/node1
node2.volumes=/ssd/ssd5/pocTSDB/volumes/node2,/ssd/ssd6/pocTSDB/volumes/node2
node3.volumes=/ssd/ssd7/pocTSDB/volumes/node3,/ssd/ssd8/pocTSDB/volumes/node3
diskIOConcurrencyLevel=0
node1.redoLogDir=/ssd/ssd3/pocTSDB/redoLog/node1
node2.redoLogDir=/ssd/ssd4/pocTSDB/redoLog/node2
node3.redoLogDir=/ssd/ssd5/pocTSDB/redoLog/node3
node1.chunkMetaDir=/ssd/ssd3/pocTSDB/metaDir/chunkMeta/node1
node2.chunkMetaDir=/ssd/ssd4/pocTSDB/metaDir/chunkMeta/node2
node3.chunkMetaDir=/ssd/ssd5/pocTSDB/metaDir/chunkMeta/node3
node1.persistenceDir=/ssd/ssd6/pocTSDB/persistenceDir/node1
node2.persistenceDir=/ssd/ssd7/pocTSDB/persistenceDir/node2
node3.persistenceDir=/ssd/ssd8/pocTSDB/persistenceDir/node3
maxPubConnections=128
subExecutors=24
subThrottle=1
persistenceWorkerNum=1
node1.subPort=8825
node2.subPort=8826
node3.subPort=8827
maxPartitionNumPerQuery=200000
streamingHAMode=raft
streamingRaftGroups=2:node1:node2:node3
node1.streamingHADir=/ssd/ssd6/pocTSDB/streamingHADir/node1
node2.streamingHADir=/ssd/ssd7/pocTSDB/streamingHADir/node2
node3.streamingHADir=/ssd/ssd8/pocTSDB/streamingHADir/node3
TSDBCacheEngineSize=16
TSDBCacheEngineCompression=false
node1.TSDBRedoLogDir=/ssd/ssd3/pocTSDB/TSDBRedoLogDir/node1
node2.TSDBRedoLogDir=/ssd/ssd4/pocTSDB/TSDBRedoLogDir/node2
node3.TSDBRedoLogDir=/ssd/ssd5/pocTSDB/TSDBRedoLogDir/node3
TSDBLevelFileIndexCacheSize=20
TSDBLevelFileIndexCacheInvalidPercent=0.6
lanCluster=0
enableChunkGranularityConfig=true
Note: Modify configuration parameter persistenceDir
with your own path.
7.2 DolphinDB Client
- Processor family: Intel(R) Core(TM) i5–11500 @ 2.70GHz 2.71 GHz
- CPU(s): 12
- Memory: 16 GB
- OS: Windows 11 Pro
- DolphinDB GUI Version: 1.30.20.1
See GUI to install DolphinDB GUI.
7.3 DolphinDB C++ API
- C++ API Version: release200.9
Note: It is recommended to install C++API with the version corresponding to the DolphinDB server. For example, install the API of release 200 for the DolphinDB server V2.00.9.
- For instructions on installing and using C++ API, see C++ API Reference Guide.
7.4 DolphinDB Python API
- Python API Version: 1.30.21.1
- For instructions on installing and using Python API, see the Python API Reference Guide.
8. Appendices
- Script for creating databases and tables:
Published at DZone with permission of Davis Chu. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments