Distributed Storage System Project Wiki
Advertisement

DHash (also sometimes referred to as a distributed hash table) is a layer built built on top of Chord and handles reliable storage of data blocks on participating nodes. It does this through techniques such as replication and erasure coding. The logical application interface is simply:

key  = put (data)        data = get (key)

Data stored in the system is immutable and identified by its contents; freeing DHash from having to worry about semantics of multiple writes. DHash has been used to build a backup system, various file systems (CFS and Ivy), and a Usenet News server.


In this thesis, the author has specified some new protocols and suggestions to  an already existing DHash and given tested results. His main goals are to make the current DHash more robust , efficient and practical. He achieves it by the use of erasure codes and techniques of proximity routing , server selection and successor list.

They basically implement this on a backup system and some the common goals between ours and this system are - 

(1) high availability    (2) high throughput for bulk transfers  (3) good support for block-size operations.

1> CHORD DHT[]

The paper explains CHORD DHT in some detail so, i will not be explaining it in this. However two basic API's are

get successor list(n)  - Contacts Chord node n and returns n’s successor list. Each node in the list includes its Chord ID, IP address and synthetic coordinates.
lookup(k, m)             -  Returns a list of at least m successors of key k. Each node in the list includes its Chord ID, IP address and synthetic coordinates.


2> BLOCK AVAILABILITY[]

First, the file is broken into fixed block size. To increase data availability, DHash splits each block into 14 fragments using the IDA erasure code. Any 7 of these fragments are sufficient to reconstruct the block. DHash stores a block’s fragments on the 14 Chord nodes immediately following the block’s key. To maintain this proper placement of fragments, DHash transfers fragments between nodes as nodes enter and leave the system.

Basic API's implemented are -
put(k, b)  - Stores the block b under the key k, where k = SHA-1(b).
get(k)      - Fetches and returns the block associated with the key k.              

Original block of size s , IDA splits the block into f fragments of size s/k . Any k(different k not key k) distinct fragments are sufficient to reconstruct the original block. Fragments are distinct if, in an information theoretic sense, they contain unique information.

IDA has the ability to randomly generate new, probabilistically distinct fragments from the block alone; it does not need to know which fragments already exist.

From f randomly (p−1)/ p , where p is the generated fragments, any k are distinct with probability greater than characteristic prime of the IDA.

Thus random fragments can be generated(with a calculated probability) to enhance reconstruction of missing fragment.

DHash implements IDA with f = 14, k = 7 and p = 65537.



  HOW the fragments are stored ?

DHash stores a block’s fragments on the f = 14 immediate successors of the block’s key. When a block is originally inserted, the DHash code on the inserting client creates the f fragments and sends them to the first 14 successors. When a client fetches a block, the client contacts enough successors to find k = 7 distinct fragments. These fragments have a 65536-in-65537 chance of being able to reconstruct the original block. If reconstruction fails, DHash keeps trying with different sets of 7 fragments.

A node may find that it holds a fragment for a block even though it is beyond the 14th successor. If it is the 15th or 16th successor, the node holds onto the fragment in case failures cause it to become one of the 14. Otherwise the node tries to send the fragment to one of the successors.

The choice of f and k are selected to optimize for 8192-byte blocks in our system which has a successor list length of r = 16. A setting of k = 7 creates 1170-byte fragments, which fit inside a single IP packet when combined with RPC overhead.


  Block Insert :put(k, b)

When an application wishes to insert a new block, it calls the DHash put(k, b) procedure. 
     void put (k, b)  // place one fragment on each successor
     {
         frags = IDAencode (b)
         succs = lookup (k, 14)
         for i (0..13)
            send (succs[i].ipaddr, k, frags[i])

    }

The latency of the complete put() operation is likely to be dominated by the max round trip time to any of 14 successors. The Chord lookup is likely to be relatively low latency: proximity routing allows it to contact nearby nodes, and the lookup can stop as soon as it gets to any of the three nodes receding key k, since the 16-entry successor list of any of those nodes will contain the desired 14 successors of k. The cost of the Chord lookup is likely to be dominated by the latency to the nearest of the three predecessors.



   Block Fetch: get(k)

To fetch a block, a client must locate and retrieve enough IDA fragments to re-assemble the original block. The interesting details are in how to avoid communicating with high-latency nodes and how to proceed when some fragments are not available.

On calling get(k), its local DHash first initiates a Chord call to lookup(k, 7), in order to find the list of nodes likely to hold the block’s fragments. This results in a list of between 7 and 16 of the nodes immediately succeeding key k.

get() then chooses the seven of these successors with the lowest latency, estimated from their synthetic coordinates. It sends each of them an RPC to request a fragment of key k, in parallel. For each RPC that times out or returns an error reply, get() sends a fragment request RPC to an as-yet-uncontacted successor from the list returned by lookup().


However, if the original call to lookup() returned fewer than 7 successors with distinct fragments, get() asks one of the successors it knows about for the complete list if it needs to. get() asks more successors for fragments if IDA fails to reconstruct the block because the fragments found were not distinct. If it cannot reconstruct the block after talking to the first 14 successors, get() returns failure to the application. <<We don't want this... do we ?>>

<<possibility of this being one of our schemes>>

Before returning a reconstructed block to the application, get() checks that the SHA-1 hash of the block’s data is equal to the block’s key. If it is not, get() returns an error.

An application may occasionally need to repeatedly invoke get(k) to successfully fetch a given key. When nodes join or leave the system, fragments need to be transferred to the correct successor nodes. If the join or leave rate is high enough fragments may become misplaced and cause a block fetch to fail.

This transient situation is repaired by the DHash maintenance algorithm presented in the next section and can be masked by retrying the get(k) on failure. By retrying, a client will see the semantics that DHash never loses a block and that all blocks are always available except those that have expired.

A persistent retry strategy reflects the assumption that a key that is retrieved is actually stored in DHash. The client using DHash can easily ensure this by recording keys in meta data and only retrieving keys recorded in this meta data.

block get (k)
{
   // Collect fragments from the successors.
   frags = []; // empty array
   succs = lookup (k, 7)
   sort_by_latency (succs)
   for (i = 0; i < #succs && i < 14; i++) {
      // download fragment
      <ret, data> = download (key, succ[i])
      if (ret == OK)
        frags.push (data)
      // decode fragments to recover block
      <ret, block> = IDAdecode (frags)
      if (ret == OK)
         return (SHA-1(block) != k) ? FAILURE : block
      if (i == #succs - 1) {
         newsuccs = get_successor_list (succs[i])
         sort_by_latency (newsuccs)
         succs.append (newsuccs)
      }
   }
   return FAILURE
}


3> Fragment Maintenance  []

 
A DHash system is in  ideal state when three conditions hold for each inserted block:

   1. multiplicity: 14, 15, or 16 fragments exist.

   2. distinctness: All fragments are distinct with high probability.

   3. location: Each of the 14 nodes succeeding the block’s key store a fragment; the following two nodes optionally store a fragment; and no other nodes store fragments.


The ideal state is attractive as it ensures all block fetches succeed, with high probability. Block inserts preserve the ideal state, since put(k, b)            stores 14 distinct fragments of block b at the 14 Chord nodes succeeding key k.   


Chord membership changes, such as node joins and node failures, disrupts DHash from the ideal state, and can cause block fetches to fail.


Location condition violated when a new node storing no fragments joins within the set of 14 successors nodes of a block’s key, since it does not store a fragment of the block.

Multiplicity condition can be violated when nodes fail since fragments are lost from the system.

Distinctness condition is not affected by node joins or failures. (as far as I think, this is the erasure coder's problem ... one more solution is for the client[owner of the file] to download it back in say every 1 month if not enough frags and create more distinct frags.)


Now, to restore DHash to the ideal state, DHash runs two maintenance protocols: a local and a global protocol.

1. Local maintenance protocol restores the multiplicity condition by recreating missing fragments.

2. Global maintenance protocol restores

   a. Location condition by moving misplaced fragments(those that violate the location condition) to the correct nodes.

   b. Multiplicity conditions by detecting and deletes extra fragments when more than 16 fragments exist for a block.


Since membership changes happen continuously, DHash is  rarely or never in the ideal state, but always tends toward it by continually running these maintenance protocols and can restore DHash to its ideal state if there are at least 7 distinct fragments for each block located anywhere in the system, barring any more membership changes.

First, the global maintenance protocol will move the misplaced fragments back to their successors, then the local maintenance protocol will recreate missing fragments until there are 14 fragments. If there are fewer than 7 distinct fragments for a block, that block is lost irrevocably.



3.2  Local DHash Maintenance

The local maintenance protocol recreates missing fragments on any of the 14 successors of a block’s key which do not store a fragment of the block.

Each DHash host synchronize its database with each of its 13 successors over the key range from the Chord ID of the host’s predecessor up to the host’s Chord ID. <in our case, the metadata file will be storing this data instead of database ... also if at all we use dsatabase on the local machine ... we need to discuss it in next sem> 

All keys in this range should be present at the host and its 13 successors.

On each Dhash host, the synchronize()  will discover any locally missing keys and inform the successors which keys they are missing. For each missing key, synchronize() calls missing(), which retrieves enough fragments to reconstruct the corresponding block. Finally, missing() uses IDA to randomly generate a new fragment. 

<so according to their system, the fragment is reconstructed each time a missing is detected>


local_maintenance (void)
{                                                                                  
    while (1)'{                                                                       
       foreach (s in mySuccessors[0..12])                           
         synchronize (s, database[mypredID ... myID])    
    }   
}     
missing (key)
{
    // Called when ’key’ does not exist on the host run this code
    block = get (key)
    frag = IDA_get_random_fragment (block)
    merkle_tree.insert (key, frag)
}
'
Algo for implementation of the local maintenance protocol. missing() is implicitly called by the implementation of synchronize().



3.1  Global DHash Maintenance  (location and multiplicity)

The global maintenance protocol pushes misplaced fragments to the correct nodes.

Now, these guys have a local database where the fragment's info are stored (we can implement it is sql_lite.... but again there are issues of client crashing and losing this data ... we will take it up next trimester)

Basically, each DHash node scans its database of fragments and pushes any fragment that it stores, but which fail the location condition, to one of the fragment’s 14 successor hosts. (eeeyaaahhhh .... )

For efficiency, the algorithm processes contiguous ranges of keys at once. Each DHash host continuously iterates through its fragment database in sorted order.

It performs a Chord lookup() to learn the 16 successor nodes of a fragment’s key. These nodes, theoretically are the only hosts which should be storing the fragment. If the DHash host is one of these nodes, the host continues on to the next key in the database, as it should be storing the key. Otherwise, the DHash host is storing a misplaced fragment and needs to push it to one of the fragment’s 14 successors, in order to restore the location condition.

The fragment’s 14 successors should also store all keys ranging from the fragment’s key up to the Chord ID of the key’s immediate successor.(???)

Consequently, the DHash host processes this entire key range at once by sending all the database keys in this range to each of the 14 successors.  A successor responds with a message that it desires some key, if it is missing the key.

In which case, the DHash host sends that fragment to that successor. It also deletes the fragment from its database to ensure that the fragment is only sent to exactly one other host, with high probability; otherwise the distinctness condition would be violated.

After offering all the keys to all the successors, the DHash node deletes all remaining keys in the specified range from the database. These keys are safe to delete because they have been offered to all the successors, but were already present on each successor. The DHash node continues sweeping the database from the end of the key range just processed.

The call to database.next(a) skips the ranges of the key space for which no blocks are in the database. This causes the sweep (i.e., the number of iterations of the while loop needed to process the entire key space) to scale as the number of misplaced blocks in the database, not as the number of nodes in the entire Chord ring.

The global maintenance protocol adopts a push-based strategy where nodes push misplaced fragments to the correct locations. This strategy contrasts with pull-based strategies where each node contacts other nodes and pulls fragments which it should store.

In general, these guys found the former more robust under highly dynamic membership change rates.

<contrast between push and pull based strategies>

To be able to recover from an arbitrary set of host failures, a pull-based strategy must pull from all nodes in the entire system. For example, consider a network partition that splits the Chord ring into a majority ring and a very small minority ring. All the block fragments that were inserted into the minority ring during the network partition are almost certainly misplaced when the rings merge. With a pull-based strategy, the correct nodes in the merged ring will have to pull blocks from the nodes from the minority ring that just merged. In essence, each node needs to know when each other node joins the ring, or else it must sweep the entire Chord ring trying to pull blocks. Both of these alternatives are expensive since they scale as the number of nodes in the entire system. This contrasts to push-based strategies where the maintenance scales as the number of misplaced blocks.

</contrast>

global_maintenance (void)
{
     a = myID
     while (1) {
        <key, frag> = database.next(a)
        succs = lookup(key, 16)
        if (myID isbetween succ[0] and succ[15])
            // we should be storing key
            a = myID
        else {
            // key is misplaced
            for each s in succs[0..13] {
                response = send_db_keys (s, database[key .. succs[0]])
                for each key in response.desired_keys
                     if (database.contains (key))
                         upload (s, database.lookup (key))
                         database.delete (key)
            }
            database.delete_range ([pred .. succs[0]])
            a = succs[0]
        }
     }
}
 


Advertisement