Thursday, October 13, 2011

Using Cacheismo Cluster for Real Time Analytics


Cacheismo now supports invoking requests on other server.
  - getFromServer
    This function takes the server name and a key as an argument and returns the result to the script.
  - getInParallel
    This function takes map of server names to list of keys and gets values for all the keys from all the servers in parallel. Once all the results are received, the script gets a map of server names to map of keys and received values.

Here is a simple example code which gets to top accessed keys from the cacheismo cluster using a simplified, framework-less map-reduce.  The contents below should belong to file names mapreduce.lua

-- function called on every cacheismo node to get 
-- the top keys by access count
-- virtual key : mapreduce:topKeys:


function topKeys(count) 
    local keys  = accessCounter:getKeyAccessCount()
    -- sort keys by access count
    table.sort (keys)
    local result = {}
    local i      = 1
    for (i in 1..count) do 
       result[i] = keys[i]
    end
    return result
end


-- function called on one of the cacheismo nodes 
-- which distributes work to all other nodes using getInParallel
-- virtual key : mapreduce:manageGetTop:10 


function manageGetTop(count) 
     local query = {}
     for k,v in pairs(servers) do 
        query[k] = "mapreduce:topKeys:"..count
     end
     local results = getInParallel(query)
     local total = {}
     for k,v in pairs(results) do 
           for k1,v1 in pairs(v) do 
              total[k1] = v1
           end
     end
     table.sort (total)
     local topkeys = {}
     for (i in 1..count) do 
       topkeys[i] = total[i]
     end
end


I have taken some liberties with the syntax and their is no accessCounter object by default in cacheismo, but is fairly easy to create in the script.  Note that the above implementation doesn't have ant intermediate nodes, but is trivial to create by calling get for mapreduce:manageGetTop instead of calling mapreduce:topKeys.

Single cacheimso server might be serving thousands of such queries because everything is happening in non-blocking manner but we give user the illusion of synchronous stack using lua coroutines.

Links:
Cacheismo Introduction
Cacheismo Lua API
Cacheimso Sliding Window Counter Implementation in Lua
Cacheismo Quota Implementation in Lua
Cacheismo Memory Allocation
Cacheismo Source Code
Cacheismo Support

Tuesday, October 11, 2011

Multi-threaded vs Multi-process

Cacheismo doesn't use threads. So does redis. Memcached is multi-threaded. How to choose?
Multi-threaded applications are hard to code and understand. Non-blocking multi-threaded applications are close to nightmare. I had the honor of writing multi-threaded non-blocking http proxy @apigee. It was written in c, had JNI bindings with java and could talk to our custom interface with kernel using shared memory (to avoid kernel space to user space data copy).  Having done that, I choose not to use threads in cacheismo.

Scalability of multi-threaded applications with multiple cores is simply a function of how much work can be done in parallel. The lesser the conflicts between the threads, better is the performance. The first problem is distribution of work among the requests. If multiple threads are working  concurrently on same piece of data, it is hard for them to make progress. Memcached uses a single selector thread to bind connections to different threads. It is not optimal. It will become bottleneck when you have thousands of clients trying to connect to the server at the same time. May be all connections of one thread are sleeping whereas all connections of some other thread are killing the cpu. So even though we have some work, few threads are not bothered because it doesn't belongs to them. This is kind of not possible with memcached (it hardly uses cpu), but very much possible with cacheismo running user scripts.  Now consider the hashmap itself. One hashmap, multiple threads, unavoidable lock. Possible solution here would be striping. What about slabs. One possibility is per thread slabs or some form of thread local caching or instead of using lock at slab level, use per slab locks.

In short, to make multi-threaded code work at optimal levels, we have to looking at our objects in finer granularity than what we originally planned. We have to make our design sub optimal to make sure threads make progress, even if at the cost of under utilization of resources.
Multi-process model works the opposite way. Consider your complete code/process as an object and create multiples of them. Have some way to distribute requests to these individual objects/processes. In many cases it is possible, specifically when either clients have knowledge of servers (consistent hashing) or if some intelligent proxy can do this for us (http load balancer). Even this could be non-optimal, but at least it is simple. What else? Memory leaks are easier to manage with multi-process model. Just restart the process. Since memory is managed at process level, it can be easily reclaimed by killing the process and if we have multiple of them, the service is not disrupted for every user. In a multi-threaded model, restart will impact all users. No locks are needed in multi process code, unless we use shared memory. The process boundary gives exclusive access to memory which doesn't need further management.

I don't see multi-process code in any ways inferior to multi-threaded code. It has some benefits like simple to code, simple to debug, simple to maintain. Other beauty is if instead of running 1 process with multiple threads you are running multiple processes with single thread, crash in the first case will cause full system failure, whereas in second case you still have (n-1)/n percent of the system up.  It is easy to upgrade things one at a time because we have more than 1 thing. With single server, no option but to shut it down, upgrade and start.


The only problem I see with multi-process code is duplication of basic data structure and configuration.  For example running each requests in a separate java process in tomcat or jboss will be a really bad idea. There is simply too much shared state - db connection pool, byte code, runtime native code, session information, etc.   The other problem is a bit superficial one; instead of managing one process, we have to manage multiple of them (use scripts?)
If your solution is either stateless or it can be partitioned such that there is no interdependence, multiprocess will make more sense. It is a difficult choice, but just don't use threads if you can survive without them, specially with non-blocking code. Try  cacheismo and see for  yourself.

Tuesday, October 04, 2011

Cacheismo Cluster Update 2

Today I could set the value on server 1 and get it back from server 2.  This script of server 2 was changed a bit.


function handleGET(command, key) 
    local cacheItem = getHashMap():get(key)
    if (cacheItem ~= nil) then
   command:writeCacheItem(cacheItem)
   cacheItem:delete()
else 
   local result = command:getFromExternalServer("127.0.0.1:11212", key)
   if (result ~= nil) then
       writeStringAsValue(command, key, result)
   end 
    end
end

This is where I feel lua beats javascript because of its excellent coroutine support.  I could easily change the above code to a while loop and go through all servers in the cluster looking for a value and it would not change the complexity of the code a bit, but try doing it with javascript callbacks and it would look like a mess. 

I am thinking now that the original idea of using dataKey and virtualKey is a bit lame. What I would do now is that I will make server an explicit argument, just like I used above. For consistent hashing we will have an object exposed in lua which can be used to find the correct server via consistent hashing, but it would be optional to use. This gives users the flexibility to use some cacheismo instances as pure proxies and others as caches. Some of the servers can be part of one consistent hashing group and others can be part of another. For some reason if users want to use one of the servers as a naming server to map keys to server, they can use it that way. Essentially users have full flexibility with respect to how to map keys to servers  - consistent hashing, multiple servers hidden behind a proxy(cacheismo), naming server to map keys to servers or any combination of these.  

Other possibilities include replication and quorum. These can be accomplished with the current API itself, but I will add parallel get support to make it as fast as possible. 

Friday, September 30, 2011

Cacheismo Lua API


The scripting capabilities of cacheismo are very powerful. The code for handing commands like get/set/add etc is completely written in lua using the lua interface. Here is the code for handling SET command:


local function handleSET(command) 
  local hashMap   = getHashMap()   
  local cacheItem = hashMap:get(command:getKey())
  if (cacheItem ~= nil) then 
      hashMap:delete(command:getKey())
      cacheItem:delete()
      cacheItem = nil
  end 
  cacheItem = command:newCacheItem()
  if (cacheItem ~= nil) then 
      hashMap:put(cacheItem)
      command:writeString("STORED\r\n")
  else 
     command:writeString("SERVER_ERROR Not Enough Memory\r\n")
  end
  return 0
end



This is the list of the lua methods which can be called by new scripts

getHashMap()
- Global method, return instance of the global hashmap object.

setLogLevel(level)
- Sets the logging level. Four levels are defined DEBUG=0, INFO=2,
  WARN=2 and ERR=3. The function takes numeric argument.

Table 
The standard table object in lua is extended to support the following methods

marshal(table) 
- returns a string object which represents the serialized table.

unmarshal(serializedTable)
- returns a fully constructed table from the given serialized table string

clone(table) 
- returns a deep copy of the given table


HashMap
The global hashmap object retrieved from getHashMap() supports following methods

get(key) 
- returns a cacheItem object for the given key if found, else nil

put(cacheItem) 
- takes a cacheItem object and puts it in the hashMap
     
delete(key) 
- deletes the cacheItem object associated with the given key from the hashMap.

deleteLRU(requiredSpace) 
- deletes as many LRU objects as required to free up at least requiredSpace.


CacheItem
The data in the hashMap is stored in the form of cacheItem objects. We need to have
a cacheItem object to store in the map and cacheItem is what we get from the map
when we do a get. The object supports following methods. These objects are
read only.

getKey()
- returns the key associated with the cacheItem

getKeySize()
- returns the number of bytes in the key

getExpiryTime()
- returns the time in seconds since epoch when the current item will expire.

getFlags()
- returns the flags associated with the cacheItem

getDataSize()
- returns the size of the data stored in this cacheItem

getData()
- returns the data stored in the cacheItem as lua string. This is normally not
used because lua script don't need access to actual data unless using virtual keys.

delete()
- deletes the reference to the cacheItem object. Important to call this after get
from hashMap, after using the cacheItem.


Command
This represents the request coming from the client. This can be modified if
required by the script.

getCommand()
- returns the memcached the command being used.
  One of get, add, set, replace, prepend, append, cas, incr, decr, gets, delete,
  stats, flush_all, version, quit or verbosity
     
getKey()
- returns the key used in the command. This is nil in case of multi-get and other
  commands which don't have key as argument.

setKey(newKey)
- set the given string newKey as the key for the command object.

getKeySize()
- returns the size in bytes for the key

getExpiryTime()
- returns the expiryTime specified in the command.

setExpiryTime(newTime)
- sets the expiry time to new value newTime.

getFlags()
- returns the flags specified in the command. It is also used by the verbosity
  command to return the logging level.
 
setFlags(newFlags)
- sets the falgs in the command object to new flags.

getDelta()
- returns the delta value used in incr/decr commands

setDelta(newDelta)
- sets the delta value to new delta value

getNoReply()
- returns the boolean noReply from the command.

setNoReply(newNoReply)
- sets the value for the noReply

getDataSize()
- returns the size of data in the command, in bytes. This works for set, add, etc.

setData(newData)
- replaces the data in the command object with the newData string

getData()
- returns the data in the command object as lua string

newCacheItem()
- creates and returns a cacheItem object using the data in the command object.
  This is used with set/add/etc

writeCacheItem(cacheItem)
- writes the cacheItem in the memcached response format on the client socket.
   "VALUE key flags size\r\ndata\r\n"
  cas is not supported.

writeString(dataString)
- writes arbitrary string to the client socket. This is useful for writing
  "END\r\n", "STORED" and other standard memcached response strings.

hasMultipleKeys()
- returns the number of multi-get keys in the command

getMultipleKeys()
- returns a table of all the keys in the multi-get command

Apart from these, few helper methods defined in config.lua are also useful.

writeStringAsValue(command, key, value) 
- writes an arbitrary string in the "VALUE key flags size\r\ndata\r\n" format.
  flags is 0 and size is calculated using string.len


executeReadOnly(command, originalKey, objectType, cacheKey, func, ...) 
- helper function for read only operations on lua tables stored in the cache.
 It retrievs the data from the cache, uses table.unmarshal to construct the
 lua table and calls the function func with this lua table as the first
 argument. Any extra args passed to this function are passed to function func.


executeReadWrite(command, originalKey, objectType, cacheKey, func, ...) 
- helper function for read/write operations on lua tables stored in the cache
  orginal object is deleted and a new object is created with the same key with
  latest data values
 

executeNew(command, originalKey, objectType, cacheKey, func, ...)
- helper function for creation of new objects based on lua tables. if the key
  is in use, it is deleted before creating the new object

See set.lua, map.lua, quota.lua and swcounter.lua for example usage.


Cacheismo sliding window counter

Sliding window counters are useful for maintaining information about some last n transactions. One example could be a site like pingdom storing last n ping times for a site. Cacheismo comes with a default sliding window counter example which can be modified to suit your needs.  The current implementation supports new, add, getlast, getmin, getmax, getavg.

Note that these are memached get requests with special keys encoding our intent.  The keys follow the following syntax:
<ObjectType>:<Method>:<ObjectKey>:<Arg1>:<Arg2>

Object type is "swcounter". Cacheismo has other object types as well and more can be added by adding new lua scripts. Methods for swcounter object are "new", "add", "getmin", "getmax","getavg", "getlast", Object key we are using is "myCounter". You would need key per counter. Number of arguments would vary depending upon the method. Here new takes 1 arg (number of values to store), add takes 1 arg (value to store). Rest all methods take no arguments.

get swcounter:new:myCounter:2
VALUE  swcounter:myCounter:100 0 8 
CREATED 
-- Create a new counter which stores the last 2 values
get swcounter:add:myCounter:100
VALUE  swcounter:add:myCounter:100 0 7 
SUCCESS 
--store 100 as first value 
get swcounter:add:myCounter:50
VALUE  swcounter:add:myCounter:50 0 7 
SUCCESS 
-- store 50 as second value
get swcounter:getmin:myCounter
VALUE  swcounter:getmin:myCounter 0 2
50
-- get the min entry among all the stored values
get swcounter:getmax:myCounter
VALUE  swcounter:getmax:myCounter 0 3
100
-- get the max entry among all the stored values
get swcounter:getavg:myCounter
VALUE  swcounter:getavg:myCounter 0 2
75
-- get the average value for all the stored values

The complete implementation in lua looks like this.
====================================================
-- sliding window counter 
local function new(windowsize) 
  local  object = {}
  if (windowsize == nil) then
     windowsize = "16"
  end
  object.size   = tonumber(windowsize)
  object.index  = 0
  object.values = {}
  local count   = 0
  for count = 1,object.size do 
object.values[count] = 0
  end
  return object
end

local function add(object, value) 
   object.index = object.index+1
   if (object.index > object.size) then 
       object.index = 1 
   end 
   object.values[object.index] = tonumber (value) 
end

local function getlast(object)
   if (object.index ~= 0) then
       return object.values[object.index]
   end
   return 0
end

local function getmin(object)
   local count = 0
   local min   = math.huge
   for count = 1,object.size do
       if (object.values[count] < min) then
          min = object.values[count]
       end 
   end
   return min
end

local function getmax(object)
   local count = 0
   local max   = 0
   for count = 1,object.size do
       if (object.values[count] > max) then
          max = object.values[count]
       end 
   end
   return max
end

local function getavg(object)
   local count = 0
   local total   = 0
   for count = 1, object.size do
        total = total + object.values[count]
   end
   return (total/object.size)
end
=============================================================