Library and tools for dynamically storing ROS messages in Apache Cassandra.
|Authors||André Dietrich & Sebastian Zug|
|firstname.lastname@example.org & email@example.com|
|Publication||ROS Meets Cassandra: Data Management in Smart Environments with NoSQL|
This package provides an interface to the Cassandra database system, which can be used to store any types of ROS-messages in column families, similar to MongoDB. But in contrast to MongoDB you are able to choose the format you would like to store your messages. You can choose between various formats and thus, explore your data afterwards by using Cassandra's CQL capabilities.
First of all you need to download and install CassandraDB manually form apache:
Then change the Cassandra partitioner to "ByteOrdered", which can be set in
search for section partitioner and change it to the following:
ByteOrdered partitioner is required to retrieve requested data ordered, otherwise it will appear. For more information, have look on the documentation on http://docs.datastax.com/en/cassandra/2.0/cassandra/architecture/architecturePartitionerAbout_c.html
Secondly, you will have to install pycassa, an python interface for CassandraDB. The project is hosted on:
All information including installation, documentation, or a tutorial, are there available.
First of all you will need start Cassandra, what is done in most cases by going into the folder, where Cassandra was installed and than by typing in:
$ bin/cassandra -f
The option -f hinders Cassandra to start a deamon in backgroud, for more information and help visit : http://wiki.apache.org/cassandra/GettingStarted
We added three simple launch-files, with which you can test, if your installation was successful. You will need a webcam to run these examples, camera-settings can be changed in the launch-file. First of all run
$ roslaunch cassandra_ros recordCamera.launch
a window with the camera-stream should appear, furthermore this starts cassandraBag.py, a service which stores this stream in a CassandraDB. If you abort, recording will stop automatically. If this does not work, or you see the message:
Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) or nodetool (enablethrift) to start it ...
then simply run:
$ sudo nodetool (enablethrift)
$ roslaunch cassandra_ros replayCamera.launch
the previous recorded stream will be replayed. If you could see yourself, everything seems to be correct installed and configured. By running:
$ roslaunch cassandra_ros deleteCamera.launch
the column-family, in which the stream was stored, gets deleted. The service cassandra_ros/node/bag/cassandraBag.py was in this case responsible for handling topics, if you want to use it as a command-line tool, see section …
There are a some graphical user interfaces available for Cassandra, which simplify the interaction with Cassandra. Using these it is quite easy to check, weather storing was successful or not, or to graphically explore your data. We recommend the usage of the following tools:
Cassandra Cluster Admin:
A php-tool similar to phpmyadmin and available under: https://github.com/sebgiroux/Cassandra-Cluster-Admin
A simple java-tool, which can be downloaded from: http://code.google.com/a/apache-extras.org/p/cassandra-gui/
Check the following link: http://wiki.apache.org/cassandra/Administration%20Tools
- These are some design-principles we made for data handling, which might a bit confusing, if you examine your data with other tools or one of the recommended GUIs.
- Each topic is stored within its own column-family, one message per row.
- Due to the maximum column-family-name-length of 42 Bytes, we use the hash-value of the topic-name as the column-family-name. But you do not need to bother with hash-values, using our interface you can still work with topic-names, everything is hidden.
- All meta information about a topic, including type, storage-format, key-format, etc. are stored within the comment-field of every column-family. No other column-family is required or has to be updated to store meta-information, what reduces the amount of additional effort, but forbids to change the comment-field manually. Everything is handled within the background.
There are only 2 classes, you need to be aware of, if you want to store and access ros-messages within Cassandra. RosCassandra, which can be seen as the management-interface to Cassandra in a ROS typical manner. CassandraTopic is required to handle the access to every topic/colum-family.
Let us start by connecting to Cassandra:
import rospy import roslib; roslib.load_manifest("cassandra_ros") import RosCassandra.RosCassandra as rc rospy.init_node('CassandraTest') # in most cases this is the standard configuration host = 'localhost' port = 9160 # that is nearly enough, only a keyspace is missing rosCas = rc.RosCassandra(host, port) # check if the keyspace exits if not rosCas.connectToKeyspace('test'): # if not, create one rosCas.createKeyspace('test') # connect again rosCas.connectToKeyspace('test')
Thats it, now we are connected. In the next step we will create a new topic/column-family:
# this is the datatype we will store from std_msgs.msg import String topic = 'test_topic' key_format='time' # this means, that we will use a timestamp as primary # key, other formats are 'hash' or 'msg_part' format = 'binary' # the format that is used for conversation, # other formats are 'string', 'ros', or 'yaml', # at first we create the new topic with the following command rosCas.addTopic(topic, format, 'String', 'std_msgs', key_format, None, # <only required if you use define key-format 'manually' comment='some additional comments...') # if everything worked well, there should be a new topic and the # following command should evaluate to true rosCas.existTopic(topic) # getting a list with all topics rosCas.getAllTopics()
In the next step we will start to insert some messages into the database
# get the topic-containerros topicContainer = rosCas.getTopic(topic) # thus we can now start to insert some messages msg = String() for i in range(100): msg.data = 'test ' + str(i) topicContainer.addData(msg) # because I do not know your time, we can simply get all messages by calling topicContainer.getAllData(queue=100) # if you know the primary keys exactly, you can also query by using them # topicContainer.getData(key, to_key, queue=100) # remove everything topicContainer.removeAllData() # or if you want to delete a special message, use: # topicContainer.removeData(key, to_key, queue=100): # now let us change the format for storing data and the key # we could either delete the whole container by calling: # rosCas.removeTopic(topic) # or we change the meta-information … this works only, if # the column-family is empty meta = topicContainer.getMeta() #meta is always a dictionary, where we change values as follows meta['cassandra_format'] = 'ros' meta['key_format'] = 'manual' # write it back to the topic rosCas.setTopicMeta(topic, meta) # because we had changed the storage format, we have to renew the # topic-container, this will be explained in more detail within the next # section topicContainer = rosCas.getTopic(topic) for i in range(100): msg.data = 'test ' + str(i) topicContainer.addData(msg, '%.2d'%i) # as you see, adding keys looks a bit weird, but they are always stored # as strings... requesting data looks as follows …. topicContainer.getData('04', '20') # you will get a list consisting of (key, value) pairs ... # also allowed is something like this: topicContainer.addData(msg, 'XXX'))
As mentioned before, we support different ways of storing data, it is either possible to store messages in a binary format, which enables fast conversion and storing. But currently it is also possible to store messages as "strings", "yaml", or "ros" format. The type of conversion is defined at the creation of a topic-container:
rosCas.addTopic(topic, format, # exactly in here ... 'string', 'yaml', 'ros', 'binary' 'String', 'std_msgs', key_format, None, comment)
If you would like to run CQL-Queries on your stored messages, you will have to use the 'ros' – format (might not work with every datatype). Every message is therefore translated into a linar form and column are strongly typed, according to the defined message formats. A Message of type "geometry_msgs/TransformStamped":
std_msgs/Header header uint32 seq time stamp string frame_id string child_frame_id geometry_msgs/Transform transform geometry_msgs/Vector3 translation float64 x float64 y float64 z geometry_msgs/Quaternion rotation float64 x float64 y float64 z float64 w
is then transformed into Cassandra format:
.header.seq : INT_TYPE .header.stamp : DATE_TYPE .header.frame_id : UTF8_TYPE .transform.translation.x : DOUBLE_TYPE .transform.translation.y : DOUBLE_TYPE .transform.translation.z : DOUBLE_TYPE .transform.rotation.x : DOUBLE_TYPE .transform.rotation.y : DOUBLE_TYPE .transform.rotation.z : DOUBLE_TYPE .transform.rotation.w : DOUBLE_TYPE
which allows to query and analyze your data afterwards, by using CQL (as described within the next section).
How does it work? If you take a look into cassandra_ros/lib ... you will see a couple of files in the form of CassandraTopic_'format'.py, which are all descendants of CassandraTopic_.py. And each of them overwrites the methods:
getColumnValidationClasses: as the name suggests, is used to define the
column validation classes of the column-family * encode is used to encode the
ROS message into a Cassandra storable format * and decode translates it back,
from Cassandra to ROS.
As you could see so far, you only work with the single class of CassandraTopic. In fact, this class inherits those methods, required for encoding and decoding of messages, and only those of the required parent class. If you are seeking for more information, have a look at the follwing link: http://www.aizac.info/a-solution-to-the-diamond-problem-in-python/
So you are free to define an own format for conversion.
If you want to analyze your data by using CQL, the CassandraQueryLanguage, you will have to generate at first secondary indexes. Because this might be expensive and not required for every part of a message, this has to be done manually, as follows:
# this will automatically generate the appropriate index rosCas.createIndex(topic, '.data')
For an automatic translation of topics, use the following method, this will replace topicnames with their hash-value.
# will return everything rosCas.exequteCQL('SELECT * FROM "test_topic"') # only a list that contains all keys rosCas.exequteCQL('SELECT KEY FROM "test_topic"') # a list with all elements stored in .data rosCas.exequteCQL('SELECT ".data" FROM "test_topic"') # will return a list of keys rosCas.exequteCQL('SELECT KEY FROM "test_topic" WHERE ".data"=\'test 12\';') # if you want to get the whole messages bag, this requires 2 steps # first of all store the keys of your statements, let us take the last keys=rosCas.exequteCQL('SELECT KEY FROM "test_topic" WHERE ".data"=\'test 12\';') messages = topicContainer.getData(keys) # remove your indexes as follows rosCas.removeIndex(topic, '.data')
For more information on CQL, check the following website: http://www.datastax.com/docs/1.0/references/cql/index
If you already checked, if your package is working, than you already had used cassandraBag. This is a simple service, which stores and replays messages similar to rosbag. Simply start the application with:
$ roslaunch cassandra_ros cassandraBag.launch
You can also change some of the parameters in the launch-file. Storing and replaying or deleting topics requires to use the following command-line tool:
$ rosrun cassandra_ros cassandraBag-cli.py
To get a list of all currently stored topics, use the parameter 'list':
$ rosrun cassandra_ros cassandraBag-cli.py list
Use record to store online messages, while option -f denotes the storage-format rosrun cassandra_ros cassandraBag-cli.py record -f string start topic and use record stop to finish recoring
$ rosrun cassandra_ros cassandraBag-cli.py record stop topic
You can bag as many topics as Cassandra supports in parallel and stop and run recording dynamically
$ rosrun cassandra_ros cassandraBag-cli.py play start /usb_cam/image_raw/compressed
and if you want to delete some topics use delete, for more help, just run
$ rosrun cassandra_ros cassandraBag-cli.py -h
or something like, which will support you with some command-line options ...
$ rosrun cassandra_ros cassandraBag-cli.py play