Tuesday, October 18, 2011

Finding Keys In Cacheismo

In the last post I talked about how to do map-reduce like stuff with cacheimso. It occurred to me that their is no method to get all the keys. The global hashmap object now supports new method getPrefixMatchingKeys which as the name implies returns keys which match a certain prefix. Typically objects are stored with following template objectType$objectKey. Thus an object created by file quota.lua with key myKey will be stored with key quota$myKey. To do something with all the quota objects:
   
local keys = getHashMap():getPrefixMatchingKeys("quota$")
for k,v in pairs(keys) do 
   print(v)
end

If you would like the quota object itself to support say getAllKeys method then ...add this to quota.lua in scripts directory.



function getAllKeys() 

    local keys = getHashMap():getPrefixMatchingKeys("quota$")
    local result = ""
    for k,v in pairs(keys) do 
       result..v.."\n"  
    end
end


Now a get request with key quota:getAllKeys would return a new line separated list of all active quota objects in the map.

This is good enough but probably not very interesting. I am planning to support indexes as first class objects to make it faster to reach interesting objects in quick time. These indexes will automatically remove objects which are deleted or forced to expire because of LRU caching. So if you need to find all the quota objects that are close to quota limit, create an index on quota used value.

Thursday, October 13, 2011

Using Cacheismo Cluster for Real Time Analytics


Cacheismo now supports invoking requests on other server.
  - getFromServer
    This function takes the server name and a key as an argument and returns the result to the script.
  - getInParallel
    This function takes map of server names to list of keys and gets values for all the keys from all the servers in parallel. Once all the results are received, the script gets a map of server names to map of keys and received values.

Here is a simple example code which gets to top accessed keys from the cacheismo cluster using a simplified, framework-less map-reduce.  The contents below should belong to file names mapreduce.lua

-- function called on every cacheismo node to get 
-- the top keys by access count
-- virtual key : mapreduce:topKeys:


function topKeys(count) 
    local keys  = accessCounter:getKeyAccessCount()
    -- sort keys by access count
    table.sort (keys)
    local result = {}
    local i      = 1
    for (i in 1..count) do 
       result[i] = keys[i]
    end
    return result
end


-- function called on one of the cacheismo nodes 
-- which distributes work to all other nodes using getInParallel
-- virtual key : mapreduce:manageGetTop:10 


function manageGetTop(count) 
     local query = {}
     for k,v in pairs(servers) do 
        query[k] = "mapreduce:topKeys:"..count
     end
     local results = getInParallel(query)
     local total = {}
     for k,v in pairs(results) do 
           for k1,v1 in pairs(v) do 
              total[k1] = v1
           end
     end
     table.sort (total)
     local topkeys = {}
     for (i in 1..count) do 
       topkeys[i] = total[i]
     end
end


I have taken some liberties with the syntax and their is no accessCounter object by default in cacheismo, but is fairly easy to create in the script.  Note that the above implementation doesn't have ant intermediate nodes, but is trivial to create by calling get for mapreduce:manageGetTop instead of calling mapreduce:topKeys.

Single cacheimso server might be serving thousands of such queries because everything is happening in non-blocking manner but we give user the illusion of synchronous stack using lua coroutines.

Links:
Cacheismo Introduction
Cacheismo Lua API
Cacheimso Sliding Window Counter Implementation in Lua
Cacheismo Quota Implementation in Lua
Cacheismo Memory Allocation
Cacheismo Source Code
Cacheismo Support

Tuesday, October 11, 2011

Multi-threaded vs Multi-process

Cacheismo doesn't use threads. So does redis. Memcached is multi-threaded. How to choose?
Multi-threaded applications are hard to code and understand. Non-blocking multi-threaded applications are close to nightmare. I had the honor of writing multi-threaded non-blocking http proxy @apigee. It was written in c, had JNI bindings with java and could talk to our custom interface with kernel using shared memory (to avoid kernel space to user space data copy).  Having done that, I choose not to use threads in cacheismo.

Scalability of multi-threaded applications with multiple cores is simply a function of how much work can be done in parallel. The lesser the conflicts between the threads, better is the performance. The first problem is distribution of work among the requests. If multiple threads are working  concurrently on same piece of data, it is hard for them to make progress. Memcached uses a single selector thread to bind connections to different threads. It is not optimal. It will become bottleneck when you have thousands of clients trying to connect to the server at the same time. May be all connections of one thread are sleeping whereas all connections of some other thread are killing the cpu. So even though we have some work, few threads are not bothered because it doesn't belongs to them. This is kind of not possible with memcached (it hardly uses cpu), but very much possible with cacheismo running user scripts.  Now consider the hashmap itself. One hashmap, multiple threads, unavoidable lock. Possible solution here would be striping. What about slabs. One possibility is per thread slabs or some form of thread local caching or instead of using lock at slab level, use per slab locks.

In short, to make multi-threaded code work at optimal levels, we have to looking at our objects in finer granularity than what we originally planned. We have to make our design sub optimal to make sure threads make progress, even if at the cost of under utilization of resources.
Multi-process model works the opposite way. Consider your complete code/process as an object and create multiples of them. Have some way to distribute requests to these individual objects/processes. In many cases it is possible, specifically when either clients have knowledge of servers (consistent hashing) or if some intelligent proxy can do this for us (http load balancer). Even this could be non-optimal, but at least it is simple. What else? Memory leaks are easier to manage with multi-process model. Just restart the process. Since memory is managed at process level, it can be easily reclaimed by killing the process and if we have multiple of them, the service is not disrupted for every user. In a multi-threaded model, restart will impact all users. No locks are needed in multi process code, unless we use shared memory. The process boundary gives exclusive access to memory which doesn't need further management.

I don't see multi-process code in any ways inferior to multi-threaded code. It has some benefits like simple to code, simple to debug, simple to maintain. Other beauty is if instead of running 1 process with multiple threads you are running multiple processes with single thread, crash in the first case will cause full system failure, whereas in second case you still have (n-1)/n percent of the system up.  It is easy to upgrade things one at a time because we have more than 1 thing. With single server, no option but to shut it down, upgrade and start.


The only problem I see with multi-process code is duplication of basic data structure and configuration.  For example running each requests in a separate java process in tomcat or jboss will be a really bad idea. There is simply too much shared state - db connection pool, byte code, runtime native code, session information, etc.   The other problem is a bit superficial one; instead of managing one process, we have to manage multiple of them (use scripts?)
If your solution is either stateless or it can be partitioned such that there is no interdependence, multiprocess will make more sense. It is a difficult choice, but just don't use threads if you can survive without them, specially with non-blocking code. Try  cacheismo and see for  yourself.

Tuesday, October 04, 2011

Cacheismo Cluster Update 2

Today I could set the value on server 1 and get it back from server 2.  This script of server 2 was changed a bit.


function handleGET(command, key) 
    local cacheItem = getHashMap():get(key)
    if (cacheItem ~= nil) then
   command:writeCacheItem(cacheItem)
   cacheItem:delete()
else 
   local result = command:getFromExternalServer("127.0.0.1:11212", key)
   if (result ~= nil) then
       writeStringAsValue(command, key, result)
   end 
    end
end

This is where I feel lua beats javascript because of its excellent coroutine support.  I could easily change the above code to a while loop and go through all servers in the cluster looking for a value and it would not change the complexity of the code a bit, but try doing it with javascript callbacks and it would look like a mess. 

I am thinking now that the original idea of using dataKey and virtualKey is a bit lame. What I would do now is that I will make server an explicit argument, just like I used above. For consistent hashing we will have an object exposed in lua which can be used to find the correct server via consistent hashing, but it would be optional to use. This gives users the flexibility to use some cacheismo instances as pure proxies and others as caches. Some of the servers can be part of one consistent hashing group and others can be part of another. For some reason if users want to use one of the servers as a naming server to map keys to server, they can use it that way. Essentially users have full flexibility with respect to how to map keys to servers  - consistent hashing, multiple servers hidden behind a proxy(cacheismo), naming server to map keys to servers or any combination of these.  

Other possibilities include replication and quorum. These can be accomplished with the current API itself, but I will add parallel get support to make it as fast as possible. 

Friday, September 30, 2011

Cacheismo Lua API


The scripting capabilities of cacheismo are very powerful. The code for handing commands like get/set/add etc is completely written in lua using the lua interface. Here is the code for handling SET command:


local function handleSET(command) 
  local hashMap   = getHashMap()   
  local cacheItem = hashMap:get(command:getKey())
  if (cacheItem ~= nil) then 
      hashMap:delete(command:getKey())
      cacheItem:delete()
      cacheItem = nil
  end 
  cacheItem = command:newCacheItem()
  if (cacheItem ~= nil) then 
      hashMap:put(cacheItem)
      command:writeString("STORED\r\n")
  else 
     command:writeString("SERVER_ERROR Not Enough Memory\r\n")
  end
  return 0
end



This is the list of the lua methods which can be called by new scripts

getHashMap()
- Global method, return instance of the global hashmap object.

setLogLevel(level)
- Sets the logging level. Four levels are defined DEBUG=0, INFO=2,
  WARN=2 and ERR=3. The function takes numeric argument.

Table 
The standard table object in lua is extended to support the following methods

marshal(table) 
- returns a string object which represents the serialized table.

unmarshal(serializedTable)
- returns a fully constructed table from the given serialized table string

clone(table) 
- returns a deep copy of the given table


HashMap
The global hashmap object retrieved from getHashMap() supports following methods

get(key) 
- returns a cacheItem object for the given key if found, else nil

put(cacheItem) 
- takes a cacheItem object and puts it in the hashMap
     
delete(key) 
- deletes the cacheItem object associated with the given key from the hashMap.

deleteLRU(requiredSpace) 
- deletes as many LRU objects as required to free up at least requiredSpace.


CacheItem
The data in the hashMap is stored in the form of cacheItem objects. We need to have
a cacheItem object to store in the map and cacheItem is what we get from the map
when we do a get. The object supports following methods. These objects are
read only.

getKey()
- returns the key associated with the cacheItem

getKeySize()
- returns the number of bytes in the key

getExpiryTime()
- returns the time in seconds since epoch when the current item will expire.

getFlags()
- returns the flags associated with the cacheItem

getDataSize()
- returns the size of the data stored in this cacheItem

getData()
- returns the data stored in the cacheItem as lua string. This is normally not
used because lua script don't need access to actual data unless using virtual keys.

delete()
- deletes the reference to the cacheItem object. Important to call this after get
from hashMap, after using the cacheItem.


Command
This represents the request coming from the client. This can be modified if
required by the script.

getCommand()
- returns the memcached the command being used.
  One of get, add, set, replace, prepend, append, cas, incr, decr, gets, delete,
  stats, flush_all, version, quit or verbosity
     
getKey()
- returns the key used in the command. This is nil in case of multi-get and other
  commands which don't have key as argument.

setKey(newKey)
- set the given string newKey as the key for the command object.

getKeySize()
- returns the size in bytes for the key

getExpiryTime()
- returns the expiryTime specified in the command.

setExpiryTime(newTime)
- sets the expiry time to new value newTime.

getFlags()
- returns the flags specified in the command. It is also used by the verbosity
  command to return the logging level.
 
setFlags(newFlags)
- sets the falgs in the command object to new flags.

getDelta()
- returns the delta value used in incr/decr commands

setDelta(newDelta)
- sets the delta value to new delta value

getNoReply()
- returns the boolean noReply from the command.

setNoReply(newNoReply)
- sets the value for the noReply

getDataSize()
- returns the size of data in the command, in bytes. This works for set, add, etc.

setData(newData)
- replaces the data in the command object with the newData string

getData()
- returns the data in the command object as lua string

newCacheItem()
- creates and returns a cacheItem object using the data in the command object.
  This is used with set/add/etc

writeCacheItem(cacheItem)
- writes the cacheItem in the memcached response format on the client socket.
   "VALUE key flags size\r\ndata\r\n"
  cas is not supported.

writeString(dataString)
- writes arbitrary string to the client socket. This is useful for writing
  "END\r\n", "STORED" and other standard memcached response strings.

hasMultipleKeys()
- returns the number of multi-get keys in the command

getMultipleKeys()
- returns a table of all the keys in the multi-get command

Apart from these, few helper methods defined in config.lua are also useful.

writeStringAsValue(command, key, value) 
- writes an arbitrary string in the "VALUE key flags size\r\ndata\r\n" format.
  flags is 0 and size is calculated using string.len


executeReadOnly(command, originalKey, objectType, cacheKey, func, ...) 
- helper function for read only operations on lua tables stored in the cache.
 It retrievs the data from the cache, uses table.unmarshal to construct the
 lua table and calls the function func with this lua table as the first
 argument. Any extra args passed to this function are passed to function func.


executeReadWrite(command, originalKey, objectType, cacheKey, func, ...) 
- helper function for read/write operations on lua tables stored in the cache
  orginal object is deleted and a new object is created with the same key with
  latest data values
 

executeNew(command, originalKey, objectType, cacheKey, func, ...)
- helper function for creation of new objects based on lua tables. if the key
  is in use, it is deleted before creating the new object

See set.lua, map.lua, quota.lua and swcounter.lua for example usage.