Introduction To Backtesting Strategy: Historical Data Replay in DolphinDB
Import historical data into a stream table in chronological order as “real-time data” so that the same script can be used both for backtesting and real-time trading.
Join the DZone community and get the full member experience.
Join For FreeIn DolphinDB, we can import historical data into a stream table in chronological order as “real-time data” so that the same script can be used both for backtesting and real-time trading. Regarding streaming in DolphinDB, please refer to DolphinDB Streaming Tutorial.
This article introduces the functions replay
and replayDS
and then demonstrates the process of data replaying.
1. Functions
replay
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])
Function replay
injects data from specified tables or data sources into stream tables.
‘inputTables’
is a table or a tuple. Each element of the tuple is an unpartitioned table or a data source generated by the functionreplayDS
.‘outputTables’
is a table or a tuple of tables, or a string or a string vector. The number of elements ofoutputTables
must be the same as the number of elements ofinputTables
. If it is a vector, it is a list of the names of the shared stream tables where the replayed data of the corresponding tables ofinputTables
are saved. If it is a tuple, each element is a shared stream table where the replayed data of the corresponding table ininputTables
are saved. The schema of each table inoutputTables
must be identical to the schema of the corresponding table ininputTables
.‘dateColumn’
and‘timeColumn’
are strings indicating the date column and time column in inputTables. If neither is specified, the first column of the table is chosen as‘dateColumn’
. If there is a‘dateColumn’
, it must be one of the partitioning columns. If only‘timeColumn’
is specified, it must be one of the partitioning columns. If information about date and time comes from the same column (e.g., DATETIME, TIMESTAMP), use the same column for both‘dateColumn’
and‘timeColumn’
. Data are replayed in batches determined by the smallest unit of time in‘timeColumn’
or‘dateColumn’
if‘timeColumn’
is not specified. For example, if the smallest unit of time in‘timeColumn’
is second, then all data in the same second are replayed in the same batch; if‘timeColumn’
is not specified, then all data in the same day are replayed in the same batch.‘replayRate’
is a nonnegative integer indicating the number of rows to be replayed per second. If it is not specified, it means data are replayed at the maximum speed.‘replayRate’
is an integer.‘absoluteRate’
is a Boolean value. The default value is true.
Regarding ‘replayRate’
and ‘absoluteRate’
:
- If
‘replayRate
’ is a positive integer andabsoluteRate=true
, replay at the speed of‘replayRate’
rows per second. - If
‘replayRate’
is a positive integer andabsoluteRate=false
, replay at‘replayRate’
times the original speed of the data. For example, if the difference between the maximum and the minimum values of‘dateColumn’
or‘timeColumn’
is n seconds, then it takesn/replayRate
seconds to finish the replay. - If ‘
replayRate’
is unspecified or negative, replay at the maximum speed.‘parallelLevel’
is a positive integer. When the size of individual partitions in the data sources is too large relative to memory size, we need to use functionreplayDS
to further divide individual partitions into smaller data sources.'parallelLevel'
indicates the number of threads loading data into memory from these smaller data sources simultaneously. The default value is 1. If'inputTables'
is a table or a tuple of tables, the effective'parallelLevel'
is always 1.
replayDS
replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])
Function replayDS
generates a group of data sources to be used as the inputs of function replay
. It splits a SQL query into multiple subqueries based on 'timeRepartitionSchema'
with 'timeColumn'
within each 'dateColumn'
partition.
‘sqlObj’
is a table or metacode with SQL statements (such as<select * from sourceTable>
) indicating the data to be replayed. The table object of “select from” must use a DATE type column as one of the partitioning columns.‘dateColumn’
and‘timeColumn’
are strings indicating the date column and time column. If neither is specified, the first column of the table is chosen as‘dateColumn’
. If there is a‘dateColumn’
, it must be one of the partitioning columns. If only‘timeColumn’
is specified, it must be one of the partitioning columns. If information about date and time comes from the same column (e.g., DATETIME, TIMESTAMP), use the same column for both‘dateColumn’
and‘timeColumn’
. FunctionreplayDS
and the corresponding functionreplay
must use the same set of'dateColumn'
and'timeColumn'
.‘timeRepartitionSchema’
is a TIME or NANOTIME type vector.‘timeRepartitionSchema’
deliminates multiple data sources on the dimension of‘timeColumn’
within each‘dateColumn’
partition. For example, if timeRepartitionSchema=[t1, t2, t3], then there are 4 data sources within a day: [00:00:00.000,t1), [t1,t2), [t2,t3) and [t3,23:59:59.999).
Replay a Single In-Memory Table
replay(inputTable, outputTable, `date, `time, 10)
Replay a Single Table Using Data Sources
To replay a single table with a large number of rows, we can use function replayDS
together with function replay. Function
replayDSdeliminates multiple data sources on the dimension of 'timeColumn' within each 'dateColumn' partition. Parameter 'parallelLevel' of function
replay` specifies the number of threads loading data into memory from these smaller data sources simultaneously. In this example, 'parallelLevel' is set to 2.
inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay(inputDS, outputTable, `date, `time, 1000, true, 2)
Replay Multiple Tables Simultaneously Using Data Sources
To replay multiple tables simultaneously, assign a tuple of these table names to the parameter ‘inputTables’
of the function replay
and specify the output tables. Each of the output tables corresponds to an input table and should have the same schema as the corresponding input table. All input tables should have identical 'dateColumn'
and 'timeColumn'
.
ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, true, 2)
Cancel Replay
If function replay
was called with submitJob
, we can use getRecentJobs
to get jobId, then cancel the replay with the command cancelJob
.
getRecentJobs()
cancelJob(jobid)
If function replay
was called directly, we can use getConsoleJobs
in another GUI session to get jobId, then cancel the replay and use the command cancelConsoleJob
.
getConsoleJobs()
cancelConsoleJob(jobId)
2. How To Use Replayed Data
Replayed data are streaming data. We can subscribe to and process the replayed data in the following 3 ways:
- Subscribe to DolphinDB. Write user-defined functions in DolphinDB to process streaming data.
- Subscribe to DolphinDB. To conduct real-time calculations with streaming data, use DolphinDB’s built-in streaming aggregators such as time-series aggregator, cross-sectional aggregator, and anomaly detection engine. They are very easy to use and have excellent performance. In section 3.2, we use a cross-sectional aggregator to calculate the intrinsic value of an ETF.
- With third-party clients through DolphinDB’s streaming API.
3. Examples
Replay level 1 stock quotes to calculate ETF intrinsic value.
In this example, we replay the level 1 stock quotes in US stock markets on 2007/08/17, and calculate the intrinsic value of an ETF with the built-in cross-sectional aggregator in DolphinDB. The following are the schema of the input table ‘quotes’ and a preview of the data.
quotes = database("dfs://TAQ").loadTable("quotes");
quotes.schema().colDefs;
select top 10 * from quotes where date=2007.08.17
1. To replay a large amount of data, if we load all data into memory first, we may have an out-of-memory problem. We can first use function replayDS
and specify parameter 'timeRepartitionSchema'
to divide the data into 60 parts based on the column 'time'.
trs = cutPoints(09:30:00.000..16:00:00.000, 60)
rds = replayDS(<select * from quotes where date=2007.08.17>, `date, `time, trs);
2. Define the output stream table ‘outQuotes’
.
sch = select name,typeString as type from quotes.schema().colDefs
share streamTable(100:0, sch.name, sch.type) as outQuotes
3. Define a dictionary for the ETF components weights and function etfVal
to calculate ETF intrinsic value. For simplicity, we use an ETF with only 6 component stocks.
defg etfVal(weights,sym, price) {
return wsum(price, weights[sym])
}
weights = dict(STRING, DOUBLE)
weights[`AAPL] = 0.1
weights[`IBM] = 0.1
weights[`MSFT] = 0.1
weights[`NTES] = 0.1
weights[`AMZN] = 0.1
weights[`GOOG] = 0.5
4. Define a streaming aggregator to subscribe to the output stream table ‘outQuotes’
. We specify a filtering condition for the subscription that only data with stock symbols of AAPL, IBM, MSFT, NTES, AMZN, or GOOG are published to the aggregator. This significantly reduces unnecessary network overhead and data transfer.
setStreamTableFilterColumn(outQuotes, `symbol)
outputTable = table(1:0, `time`etf, [TIMESTAMP,DOUBLE])
tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `symbol, `perBatch)
subscribeTable(tableName="outQuotes", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true, filter=`AAPL`IBM`MSFT`NTES`AMZN`GOOG)
5. Start to replay data at the specified speed of 100,000 rows per second. The streaming aggregator conducts real-time calculations with the replayed data.
submitJob("replay_quotes", "replay_quotes_stream", replay, [rds], [`outQuotes], `date, `time, 100000, true, 4)
6. Check ETF intrinsic values
select top 15 * from outputTable
4. Performance Testing
We tested data replaying in DolphinDB on a server with the following configuration:
- Server: DELL PowerEdge R730xd
- CPU: Intel Xeon(R) CPU E5–2650 v4(24cores, 48 threads, 2.20GHz)
- RAM: 512 GB (32GB × 16, 2666 MHz)
- Harddisk: 17T HDD (1.7T × 10, read speed 222 MB/s, write speed 210 MB/s)
- Network: 10 Gigabit Ethernet
DolphinDB script:
sch = select name,typeString as type from quotes.schema().colDefs
trs = cutPoints(09:30:00.000..16:00:00.001,60)
rds = replayDS(<select * from quotes where date=2007.08.17>, `date, `time, trs);
share streamTable(100:0, sch.name, sch.type) as outQuotes1
jobid = submitJob("replay_quotes","replay_quotes_stream", replay, [rds], [`outQuotes1], `date, `time, , ,4)
When replaying at maximum speed (parameter ‘replayRate’
is not specified) The output table is not subscribed, and it only takes about 100 seconds to replay 336,305,414 rows of data.
Published at DZone with permission of Davis Chu. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments