Saturday, July 29, 2017

On eventually consistent file listing


Introduction 


Cheap s3 storage comes with unusual cost: correctness.  One of the key problems while working with highly partitioned data stored in s3, is the problem of eventual consistency in file listing. What exactly is this problem and how can we think about mitigating its impact: we will discuss in this post.

File listing is a very common operations since the invention of files. Given a directory or path, it gives us the list of files under that path. Tons of lines of code written over file systems, depend on correctness of this operation.  Unfortunately, this assumption breaks when the listing is done on s3 or for that matter any blob store.

Deconstructing file listing 


One way to think about eventual consistency of file listing is to argue that we get a wrong answer. This is correct to some extent but not powerful enough to do something about it. To do something about it, we need to dig a bit deeper and understand the nature of this wrongness. I find it useful to characterise this in the following form:
  • Ghost files 
  • Conceived files 
Lets try to understand what they mean. Ghost files are files which are listed by the file listing operation but they have actually been deleted from the file system. This is a very common reason for job failures in spark and hive. They are called ghost for obvious reasons. Conceived files on the other hand are those files which actually exist, but were not returned by the listing API.  In the happy (immediately unhappy) path, eventual consistency causes jobs to fail, because further operations on ghost file keep failing, irrespective of the number of retries. In the unhappy (short term happy) path, we have data loss because of conceived file, because they are simply missing in the final result,  resulting in incorrect answers.  

Given these two constructs, we can argue that the wrongness of a listing operation will occur either because of Ghost files (files which shouldn't be present in the listing but are) and conceived files (files which should be present in the listing but are not there). We can now have separate solutions for dealing with detection and consequences of these two file classes.

Dealing with Ghost files


Ghost file are files which shouldn't have existed in the listing API to start with. Once they show up, they cause different problem depending on what operations we are doing with these files. Most common problem would be subsequent file not found errors. One way to deal with this is to do a fresh listing operations and do a set subtraction.
Let A be the result of listing operation at time t1
and B be the result of listing operation at time t2,
where t2 > t1.
Set A-B i.e the files which are in A but not in B, is essentially the list of currently known ghost files. Once detected, we can choose to deal with them in some form. One simple way is to ignore the failures caused by ghost files, because we know they should fail. The other option is to remove them from our task queue, because we know they are not part of the final solution set. We might need to iterate multiple times (say, till a fixed point) to find out all the ghost files.

Dealing with Conceived files


Conceived files are the files which didn't even show up.
Lets again consider that A be the result of listing operation at time t1
and B be the result of listing operation at time t2,
where t2 > t1.
Set B-A i.e the files which are in B but were not in A, is essentially the list of current known conceived files. These are files which we would have missed if we only do a single listing operation. Once detected, we can choose to deal with them in some form.  Handling of conceived files is relatively simple. We just need to add them to our working set. We might need to iterate multiple times (say, till a fixed point) to find out all the conceived files and treat them as if they were part of the original listing operation. 

It is tempting to say why not wait until the system has become consistent before starting the work. In theory it works, it practice we don't know how much time it will take. Starting with whatever information we can get from the listing API, we get a head start and can keep revising our task set depending upon what further listing API reveals. What we get through this approach is correctness but without introducing any performance penalties.

In conclusion, we can deal with eventual consistency in file listing operations by repeating the listing operation, detecting ghost and conceived files and modifying our work queues to take our new knowledge about the listing status into account.



10 comments:

  1. In practice, what is typical value of t2-t1? And how much work do we throw away when B-A is non-empty? And when do we stop checking? For example if every listing has a probability of presence of ghost/conceived files of delta%, then after n listing your probability of being correct is 1-(1/delta)^n ... however to make it arbitrarily low n may have to be large enough (and you may need to restart the job as many times).

    ReplyDelete
  2. Good to see your comment :) Sort of depends on size of the listing, ranges from few seconds to minutes. Well what we throw away is mostly zero because it will fails. The goal is to not throw away current work, by changing the definition of the work (taking into account old mistakes). In practice, the listing API converges to truth, in few cycles - sometimes just 1. The problem is even if this thing fails once in 10 days after a 5 hour 100 node job, lots of people get anxious. We stop checking if consecutive k listings give same answer or if we have run out of "consistency budget".

    ReplyDelete
  3. Especially the listing inconsistency which can be very dangerous and have the chances of loosing data silently as the hadoop/Spark job completes!!!!..:)

    https://issues.apache.org/jira/browse/HADOOP-13345 handles these inconsistencies.

    ReplyDelete
    Replies
    1. I have looked at s3guard and it is a good solution. I started investigating this more from Spark side and found that we have couple of instances where this problem is more prominent. One of them was loading hive tables from spark. The good part about the solution above is that it doesn't requires a new subsystem (dynamo backed metadata) and looks only at internal inconsistencies. The other part that I didn't mention in the post, is that this makes the code easy to parallelise. We have seen upto 30x faster loading of tables for data sets partitioned into 10000 files. With more partitions, we will see further improvements.

      Delete
    2. btw..if you are looking at spark, there is a different approach for committer which relies on etag which is fast as well (HADOOP-13786 would be a good starting point). Parquet's committer is highly error prone and even databricks discourages parquet's direct committer algo.

      Delete
    3. I am not upto speed with the current state of this JIRA, but I loved the idea of using multipart uploads API to implement a committer when I first heard it few months back. This is based on Netflix implementation, right? As far as I understand, this particular approach avoids the need for listing by maintaining a parallel copy of the files. Worth stealing :)
      I think in Append mode, parquet on spark forces the use of ParquetOutputCommitter, which is almost FileOutputCommitter except the metadata generation part. Faster FOC will help Parquet on Spark.

      Delete
  4. There is a special case where the Ghost file don't go away at all in this model and even explicit delete won't work, I have reproduced this multiple times on AWS S3.

    Here is the reply from Amazon:

    The root cause was traced to a bug in S3’s index subsystem. This bug triggered an extremely rare condition which caused the prefix to be deleted but remain present in our index. The prefix showed up as a result in the LIST command, but attempting to delete the prefix resulted in a failure. The conditions under which this bug would manifest themselves are extremely rare.

    We have audited relevant aspects of S3’s codebase and are confident that we have rectified this behavior. The fix will be available worldwide by the end of February.

    The S3 team is still investigating and will keep your TAM apprised of development.

    Please feel free to reach out if you have any further questions or concerns.

    Fabio L
    Amazon Web Services

    ReplyDelete
    Replies
    1. Will love to take a look at the code or JIRA to reproduce this. I have tested it with upto 10000 partitions and hardly ever seen anything like this. Thanks!!

      Delete
  5. What about files that were deleted between t1 and t2 but still show up in list B?

    ReplyDelete
    Replies
    1. That would be cheating :)
      If the source continues to be modified while we are doing any operations: as in your special case, source files are being deleted, this algorithm will end up treating these as Ghost files and do the relevant processing.

      Having said that, the purpose of this algorithm is a bit different. We live in simpler world, where no one is deleting or adding files. Typically files are added and deleted by multiple distributed workers and only when this work has finished, spark driver starts loading these files to hive tables. So in practice (ours and generally Big Data systems), no one deletes files between t1 and t2.
      But this does looks like a good problem to solve. Feel free to provide more context. I will try to answer as much as I can.

      Delete