Thursday 25 July 2013

Managing Big Data with MongoDB


Update: Since writing this post I have changed my mind about MongoDB being an effective tick data storage solution. In my experience selection speeds are far too slow. MongoDB is document oriented and inherently unsuitable for storing time series. A more fitting solution may be a column oriented database. I know that KDB is a very fast column oriented database widely used for tick data storage but unfortunately it is only available on an expensive commercial license - I'm currently trying to find a good open source alternative to KDB.




 Liquid exchange-traded instruments typically generate millions of ticks per trading session in the form of trades and quotes changes. Storing such vast quantities of data in CSV files is not practical. There are a range of alternatives to CSV files including binary file formats like HDF5, traditional relational databases such as PostgreSQL and MySQL, and NoSQL databases, with MongoDB and Cassandra being two of the most popular. I decided on MongoDB for my data storage needs, and in this post I catalogue my experiences setting up a database. Please note that the database schema I present is for illustrative purposes only. There is a large amount of online content that advises on how best to set up a MongoDB schema.

The mongo installation includes a shell which allows the user to interact with the database from the command line. The shell can be started with the command mongo.
-bash$ mongo
MongoDB shell version: 2.4.5
connecting to: test
>

A database can be selected with the use command. In this example I select a new database called mydb. Please note that the database is not actually created until a document is inserted. For those users more familiar with traditional SQL databases, a collection is analogous to a table, and a document is equivalent to a row.
> use mydb
switched to db mydb

I then add 4 documents to mydb within a collection named trades, using the insert command. Each document represents a trade, with four fields: date, time, price, and volume. I use short field names (d, t, p, and v) because each document stores a copy of the field names and this can increase disk usage substantially when there are hundreds of millions of documents in a database.
> db.trades.insert( {d: "20130122", t: "08:01:22.298",  p: 96.23, v: 1 } )
> db.trades.insert( {d: "20130122", t: "08:01:22.698",  p: 96.24, v: 2 } )
> db.trades.insert( {d: "20130122", t: "08:01:23.256",  p: 96.23, v: 1 } )
> db.trades.insert( {d: "20130122", t: "08:01:23.557",  p: 96.24, v: 3 } )

The contents of the trades collection can be displayed using the find function with no parameters.
> db.trades.find().pretty()

{
 "_id" : ObjectId("51f2a8df6047fc37731a8f0a"),
 "d" : "20130122",
 "t" : "08:01:22.298",
 "p" : 96.23,
 "v" : 1
}
{
 "_id" : ObjectId("51f2a91c6047fc37731a8f0b"),
 "d" : "20130122",
 "t" : "08:01:22.698",
 "p" : 96.24,
 "v" : 2
}
{
 "_id" : ObjectId("51f2a9336047fc37731a8f0c"),
 "d" : "20130122",
 "t" : "08:01:23.256",
 "p" : 96.23,
 "v" : 1

}
{
 "_id" : ObjectId("51f2a9446047fc37731a8f0d"),
 "d" : "20130122",
 "t" : "08:01:23.557",
 "p" : 96.24,
 "v" : 3

}

The count function also correctly indicates that there are 4 documents in the trades collection.
> db.trades.count()

4

I then query the trades collection for all documents where field v is equal to 1, and two records are returned as expected.
> db.trades.find({v: 1})

{ "_id" : ObjectId("51f2a8df6047fc37731a8f0a"), "d" : "20130122", "t" : "08:01:22.298", "p" : 96.23, "v" : 1 }
{ "_id" : ObjectId("51f2a9336047fc37731a8f0c"), "d" : "20130122", "t" : "08:01:23.256", "p" : 96.23, "v" : 1 } 

However, the explain function shows that all four documents were scanned during the search. This isn't very efficient.
> db.trades.find({v: 1}).explain()

{
 "cursor" : "BasicCursor",
 "isMultiKey" : false,
 "n" : 2,
 "nscannedObjects" : 4,
 "nscanned" : 4,
 "nscannedObjectsAllPlans" : 4,
 "nscannedAllPlans" : 4,
 "scanAndOrder" : false,
 "indexOnly" : false,
 "nYields" : 0,
 "nChunkSkips" : 0,
 "millis" : 0,
 "indexBounds" : {
 },
 "server" : "yeats:27017"
}

The reason for this is that the trades collection has only one index by default, which is an auto generated unique identifier.
 db.trades.getIndexes()
[
 {
  "v" : 1,
  "key" : {

   "_id" : 1
  },

  "ns" : "mydb.trades",
  "name" : "_id_"
 }
] 
I then create an index for the trades collection based on the volume field by calling the ensureIndex function. A subsequent call to getIndexes confirms that a second index has been created.
> db.trades.ensureIndex({v: 1})

> db.trades.getIndexes()

[
 {
  "v" : 1,
  "key" : {
   "_id" : 1
  },
  "ns" : "mydb.trades",
  "name" : "_id_"
 },

 {
  "v" : 1,
  "key" : {

   "v" : 1
  },

  "ns" : "mydb.trades",
  "name" : "v_1"
 }
] 

I run the same query again, and this time we can see that only two documents were scanned to find two matches. This may not seem like a big deal now, but if you don't create indexes in your database then your queries will be cripplingly slow when the collections grow in size. You can create multiple indexes for each collection in your database depending on which fields you expect to include in your queries.
> db.trades.find({v: 1}).explain()

{
 "cursor" : "BtreeCursor v_1",
 "isMultiKey" : false,
 "n" : 2,
 "nscannedObjects" : 2,
 "nscanned" : 2,
 "nscannedObjectsAllPlans" : 2,
 "nscannedAllPlans" : 2,
 "scanAndOrder" : false,
 "indexOnly" : false,
 "nYields" : 0,
 "nChunkSkips" : 0,
 "millis" : 38,
 "indexBounds" : {
  "v" : [
   [
    1,
    1
   ]
  ]
 },
 "server" : "myserver:27017"
}

Rather than manually creating indexes for each collection, I wrote a Python script that automatically iterates through all collections, calling the create_index function on each. Interoperability between Python and MongoDB is enabled by the PyMongo package. The following script creates three indexes for each collection, one based on date, a second on time, and a third compound index based on date and time.
import pymongo
from pymongo import MongoClient
from pymongo import ASCENDING, DESCENDING

client = MongoClient()
client = MongoClient('localhost', port#)
db = client.mydb
collections = sorted(db.collection_names());

for s in collections:
    if s != "system.indexes":
        db[s].create_index([("d", ASCENDING)])
        db[s].create_index([("t", ASCENDING)])
        db[s].create_index([("d", ASCENDING),
                            ("t", ASCENDING)])
        print("Indexes created for collection ",s)

Having executed this script on my database, queries based on date and time ranges are completed in a timely fashion. The following command returns all trades between 8am and 4pm on Jan 4th 2012, sorted in ascending order by date and time.
> db.trades.find({d: "20120104" }, t: {$gte: "08:00:00.000", $lt:"16:00:00.000"}   }).sort( {d: 1, t: 1 } )

Of course, updating your database document by document is not feasible. There are a number of options for automatically populating the database. One option is to use the mongoimport command from bash to import a CSV file to the database where each row is interpreted as a document.
-bash$ mongoimport -d taq -c trades --type csv --file trades.csv --headerline 

A second option is to write a batch insert function in C++ and connect to MongoDB using the C++ Drivers. I developed a C++ function called insertTrades which parses the contents of a CSV file and creates BSONObj objects, which are then inserted to MongoDB.


Querying and parsing trades from the database can also be achieved using the C++ Drivers. The following C++ function returns a vector of trade custom type objects for a single trading session specified by the key parameter.


So far my MongoDB experiences have been very positive! I will post on this topic in the near future! Thanks for reading.

No comments:

Post a Comment