当前位置: 首页 > 工具软件 > I2P > 使用案例 >

i2p源码笔记-KBucketSet.java

常小白
2023-12-01

说明

这个文件的笔记中,由于代码过长,但是有不能不加,可以直接看下边的详细笔记

sourcecode

package net.i2p.kademlia;
/*
 * free (adj.): unencumbered; not under the control of others
 * Written by jrandom in 2003 and released into the public domain
 * with no warranty of any kind, either expressed or implied.
 * It probably won't make your computer catch on fire, or eat
 * your children, but it might.  Use at your own risk.
 *
 */

import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.SimpleDataStructure;
import net.i2p.util.LHMCache;
import net.i2p.util.Log;

/**
 * In-memory storage of buckets sorted by the XOR metric from the base (us)
 * passed in via the constructor.
 * This starts with one bucket covering the whole key space, and
 * may eventually be split to a max of the number of bits in the data type
 * (160 for SHA1Hash or 256 for Hash),
 * times 2**(B-1) for Kademlia value B.
 *
 * Refactored from net.i2p.router.networkdb.kademlia
 * @since 0.9.2 in i2psnark, moved to core in 0.9.10
 */
public class KBucketSet<T extends SimpleDataStructure> {
    private final Log _log;
    private final I2PAppContext _context;
    private final T _us;

    /**
     * The bucket list is locked by _bucketsLock, however the individual
     * buckets are not locked. Users may see buckets that have more than
     * the maximum k entries, or may have adds and removes silently fail
     * when they appear to succeed.
     *
     * Closest values are in bucket 0, furthest are in the last bucket.
     */
    private final List<KBucket<T>> _buckets;
    private final Range<T> _rangeCalc;
    private final KBucketTrimmer<T> _trimmer;
    
    /**
     *  Locked for reading only when traversing all the buckets.
     *  Locked for writing only when splitting a bucket.
     *  Adds/removes/gets from individual buckets are not locked.
     */
    private final ReentrantReadWriteLock _bucketsLock = new ReentrantReadWriteLock(false);

    private final int KEYSIZE_BITS;
    private final int NUM_BUCKETS;
    private final int BUCKET_SIZE;
    private final int B_VALUE;
    private final int B_FACTOR;
    
    /**
     * Use the default trim strategy, which removes a random entry.
     * @param us the local identity (typically a SHA1Hash or Hash)
     *           The class must have a zero-argument constructor.
     * @param max the Kademlia value "k", the max per bucket, k &gt;= 4
     * @param b the Kademlia value "b", split buckets an extra 2**(b-1) times,
     *           b &gt; 0, use 1 for bittorrent, Kademlia paper recommends 5
     */
    public KBucketSet(I2PAppContext context, T us, int max, int b) {
        this(context, us, max, b, new RandomTrimmer<T>(context, max));
    }

    /**
     * Use the supplied trim strategy.
     */
    public KBucketSet(I2PAppContext context, T us, int max, int b, KBucketTrimmer<T> trimmer) {
        _us = us;
        _context = context;
        _log = context.logManager().getLog(KBucketSet.class);
        _trimmer = trimmer;
        if (max <= 4 || b <= 0 || b > 8)
            throw new IllegalArgumentException();
        KEYSIZE_BITS = us.length() * 8;
        B_VALUE = b;
        B_FACTOR = 1 << (b - 1);
        NUM_BUCKETS = KEYSIZE_BITS * B_FACTOR;
        BUCKET_SIZE = max;
        _buckets = createBuckets();
        _rangeCalc = new Range<T>(us, B_VALUE);
        // this verifies the zero-argument constructor
        makeKey(new byte[us.length()]);
    }
    
    private void getReadLock() {
        _bucketsLock.readLock().lock();
    }

    /**
     *  Get the lock if we can. Non-blocking.
     *  @return true if the lock was acquired
     */
    private boolean tryReadLock() {
        return _bucketsLock.readLock().tryLock();
    }

    private void releaseReadLock() {
        _bucketsLock.readLock().unlock();
    }

    /** @return true if the lock was acquired */
    private boolean getWriteLock() {
        try {
            boolean rv = _bucketsLock.writeLock().tryLock(3000, TimeUnit.MILLISECONDS);
            if ((!rv) && _log.shouldLog(Log.WARN))
                _log.warn("no lock, size is: " + _bucketsLock.getQueueLength(), new Exception("rats"));
            return rv;
        } catch (InterruptedException ie) {}
        return false;
    }

    private void releaseWriteLock() {
        _bucketsLock.writeLock().unlock();
    }

    /**
     * @return true if the peer is new to the bucket it goes in, or false if it was
     *  already in it. Always returns false on an attempt to add ourselves.
     *
     */
    public boolean add(T peer) {
        KBucket<T> bucket;
        getReadLock();
        try {
            bucket = getBucket(peer);
        } finally { releaseReadLock(); }
        if (bucket != null) {
            if (bucket.add(peer)) {
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug("Peer " + peer + " added to bucket " + bucket);
                if (shouldSplit(bucket)) {
                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug("Splitting bucket " + bucket);
                    split(bucket.getRangeBegin());
                    //testAudit(this, _log);
                }
                return true;
            } else {
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug("Peer " + peer + " NOT added to bucket " + bucket);
                return false;
            }
        } else {
            if (_log.shouldLog(Log.WARN))
                _log.warn("Failed to add, probably us: " + peer);
            return false;
        }
    }

    /**
     *  No lock required.
     *  FIXME will split the closest buckets too far if B &gt; 1 and K &lt; 2**B
     *  Won't ever really happen and if it does it still works.
     */
    private boolean shouldSplit(KBucket<T> b) {
        return
               b.getRangeBegin() != b.getRangeEnd() &&
               b.getKeyCount() > BUCKET_SIZE;
    }

    /**
     *  Grabs the write lock.
     *  Caller must NOT have the read lock.
     *  The bucket should be splittable (range start != range end).
     *  @param r the range start of the bucket to be split
     */
    private void split(int r) {
        if (!getWriteLock())
            return;
        try {
            locked_split(r);
        } finally { releaseWriteLock(); }
    }

    /**
     *  Creates two or more new buckets. The old bucket is replaced and discarded.
     *
     *  Caller must hold write lock
     *  The bucket should be splittable (range start != range end).
     *  @param r the range start of the bucket to be split
     */
    private void locked_split(int r) {
        int b = pickBucket(r);
        while (shouldSplit(_buckets.get(b))) {
            KBucket<T> b0 = _buckets.get(b);
            // Each bucket gets half the keyspace.
            // When B_VALUE = 1, or the bucket is larger than B_FACTOR, then
            // e.g. 0-159 => 0-158, 159-159
            // When B_VALUE > 1, and the bucket is smaller than B_FACTOR, then
            // e.g. 1020-1023 => 1020-1021, 1022-1023
            int s1, e1, s2, e2;
            s1 = b0.getRangeBegin();
            e2 = b0.getRangeEnd();
            if (B_VALUE == 1 ||
                ((s1 & (B_FACTOR - 1)) == 0 &&
                 ((e2 + 1) & (B_FACTOR - 1)) == 0 &&
                 e2 > s1 + B_FACTOR)) {
                // The bucket is a "whole" kbucket with a range > B_FACTOR,
                // so it should be split into two "whole" kbuckets each with
                // a range >= B_FACTOR.
                // Log split
                s2 = e2 + 1 - B_FACTOR;
            } else {
                // The bucket is the smallest "whole" kbucket with a range == B_FACTOR,
                // or B_VALUE > 1 and the bucket has already been split.
                // Start or continue splitting down to a depth B_VALUE.
                // Linear split
                s2 = s1 + ((1 + e2 - s1) / 2);
            }	
            e1 = s2 - 1;
            if (_log.shouldLog(Log.INFO))
                _log.info("Splitting (" + s1 + ',' + e2 + ") -> (" + s1 + ',' + e1 + ") (" + s2 + ',' + e2 + ')');
            KBucket<T> b1 = createBucket(s1, e1);
            KBucket<T> b2 = createBucket(s2, e2);
            for (T key : b0.getEntries()) {
                if (getRange(key) < s2)
                    b1.add(key);
                else
                    b2.add(key);
            }
            _buckets.set(b, b1);
            _buckets.add(b + 1, b2);
            if (_log.shouldLog(Log.DEBUG))
                _log.debug("Split bucket at idx " + b +
                           ":\n" + b0 +
                           "\ninto: " + b1 +
                           "\nand: " + b2);
            //if (_log.shouldLog(Log.DEBUG))
            //    _log.debug("State is now: " + toString());

            if (b2.getKeyCount() > BUCKET_SIZE) {
                // should be rare... too hard to call _trimmer from here
                // (and definitely not from inside the write lock)
                if (_log.shouldLog(Log.INFO))
                    _log.info("All went into 2nd bucket after split");
            }
            // loop if all the entries went in the first bucket
        }
    }

    /**
     *  The current number of entries.
     */
    public int size() {
        int rv = 0;
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                rv += b.getKeyCount();
            }
        } finally { releaseReadLock(); }
        return rv;
    }
    
    public boolean remove(T entry) {
        KBucket<T> kbucket;
        getReadLock();
        try {
            kbucket = getBucket(entry);
        } finally { releaseReadLock(); }
        if (kbucket == null)  // us
            return false;
        boolean removed = kbucket.remove(entry);
        return removed;
    }
    
    /** @since 0.8.8 */
    public void clear() {
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                b.clear();
            }
        } finally { releaseReadLock(); }
        _rangeCalc.clear();
    }
    
    /**
     *  @return a copy in a new set
     */
    public Set<T> getAll() {
        Set<T> all = new HashSet<T>(256);
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                all.addAll(b.getEntries());
            }
        } finally { releaseReadLock(); }
        return all;
    }

    /**
     *  @return a copy in a new set
     */
    public Set<T> getAll(Set<T> toIgnore) {
        Set<T> all = getAll();
        all.removeAll(toIgnore);
        return all;
    }
    
    public void getAll(SelectionCollector<T> collector) {
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                b.getEntries(collector);
            }
        } finally { releaseReadLock(); }
    }
    
    /**
     *  The keys closest to us.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(int max) {
        return getClosest(max, Collections.<T> emptySet());
    }
    
    /**
     *  The keys closest to us.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(int max, Collection<T> toIgnore) {
        List<T> rv = new ArrayList<T>(max);
        int count = 0;
        getReadLock();
        try {
            // start at first (closest) bucket
            for (int i = 0; i < _buckets.size() && count < max; i++) {
                Set<T> entries = _buckets.get(i).getEntries();
                // add the whole bucket except for ignores,
                // extras will be trimmed after sorting
                for (T e : entries) {
                    if (!toIgnore.contains(e)) {
                        rv.add(e);
                        count++;
                    }
                }
            }
        } finally { releaseReadLock(); }
        Comparator<T> comp = new XORComparator<T>(_us);
        Collections.sort(rv, comp);
        int sz = rv.size();
        for (int i = sz - 1; i >= max; i--) {
            rv.remove(i);
        }
        return rv;
    }
    
    /**
     *  The keys closest to the key.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(T key, int max) {
        return getClosest(key, max, Collections.<T> emptySet());
    }
    
    /**
     *  The keys closest to the key.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(T key, int max, Collection<T> toIgnore) {
        if (key.equals(_us))
            return getClosest(max, toIgnore);
        List<T> rv = new ArrayList<T>(max);
        int count = 0;
        getReadLock();
        try {
            int start = pickBucket(key);
            // start at closest bucket, then to the smaller (closer to us) buckets
            for (int i = start; i >= 0 && count < max; i--) {
                Set<T> entries = _buckets.get(i).getEntries();
                for (T e : entries) {
                    if (!toIgnore.contains(e)) {
                        rv.add(e);
                        count++;
                    }
                }
            }
            // then the farther from us buckets if necessary
            for (int i = start + 1; i < _buckets.size() && count < max; i++) {
                Set<T> entries = _buckets.get(i).getEntries();
                for (T e : entries) {
                    if (!toIgnore.contains(e)) {
                        rv.add(e);
                        count++;
                    }
                }
            }
        } finally { releaseReadLock(); }
        Comparator<T> comp = new XORComparator<T>(key);
        Collections.sort(rv, comp);
        int sz = rv.size();
        for (int i = sz - 1; i >= max; i--) {
            rv.remove(i);
        }
        return rv;
    }

    /**
     *  The bucket number (NOT the range number) that the xor of the key goes in
     *  Caller must hold read lock
     *  @return 0 to max-1 or -1 for us
     */
    private int pickBucket(T key) {
        int range = getRange(key);
        if (range < 0)
            return -1;
        int rv = pickBucket(range);
        if (rv >= 0) {
             return rv;
        }
        _log.error("Key does not fit in any bucket?!\nKey  : [" 
                   + DataHelper.toHexString(key.getData()) + "]" 
                   + "\nUs   : " + _us
                   + "\nDelta: ["
                   + DataHelper.toHexString(DataHelper.xor(_us.getData(), key.getData()))
                   + "]", new Exception("???"));
        _log.error(toString());
        throw new IllegalStateException("pickBucket returned " + rv);
        //return -1;
    }
    
    /**
     *  Returned list is a copy of the bucket list, closest first,
     *  with the actual buckets (not a copy).
     *
     *  Primarily for testing. You shouldn't ever need to get all the buckets.
     *  Use getClosest() or getAll() instead to get the keys.
     *
     *  @return non-null
     */
    List<KBucket<T>> getBuckets() {
        getReadLock();
        try {
            return new ArrayList<KBucket<T>>(_buckets);
        } finally { releaseReadLock(); }
    }

    /**
     *  The bucket that the xor of the key goes in
     *  Caller must hold read lock
     *  @return null if key is us
     */
    private KBucket<T> getBucket(T key) {
       int bucket = pickBucket(key);
       if (bucket < 0)
           return null;
       return _buckets.get(bucket);
    }
    
    /**
     *  The bucket number that contains this range number
     *  Caller must hold read lock or write lock
     *  @return 0 to max-1 or -1 for us
     */
    private int pickBucket(int range) {
        // If B is small, a linear search from back to front
        // is most efficient since most of the keys are at the end...
        // If B is larger, there's a lot of sub-buckets
        // of equal size to be checked so a binary search is better
        if (B_VALUE <= 3) {
            for (int i = _buckets.size() - 1; i >= 0; i--) {
                KBucket<T> b = _buckets.get(i);
                if (range >= b.getRangeBegin() && range <= b.getRangeEnd())
                    return i;
            }
            return -1;
        } else {
            KBucket<T> dummy = new DummyBucket<T>(range);
            return Collections.binarySearch(_buckets, dummy, new BucketComparator<T>());
        }
    }

    private List<KBucket<T>> createBuckets() {
        // just an initial size
        List<KBucket<T>> buckets = new ArrayList<KBucket<T>>(4 * B_FACTOR);
        buckets.add(createBucket(0, NUM_BUCKETS -1));
        return buckets;
    }
    
    private KBucket<T> createBucket(int start, int end) {
        if (end - start >= B_FACTOR &&
            (((end + 1) & B_FACTOR - 1) != 0 ||
             (start & B_FACTOR - 1) != 0))
            throw new IllegalArgumentException("Sub-bkt crosses K-bkt boundary: " + start + '-' + end);
        KBucket<T> bucket = new KBucketImpl<T>(_context, start, end, BUCKET_SIZE, _trimmer);
        return bucket;
    }
    
    /**
     *  The number of bits minus 1 (range number) for the xor of the key.
     *  Package private for testing only. Others shouldn't need this.
     *  @return 0 to max-1 or -1 for us
     */
    int getRange(T key) {
        return _rangeCalc.getRange(key);
    }
    
    /**
     *  For every bucket that hasn't been updated in this long,
     *  or isn't close to full,
     *  generate a random key that would be a member of that bucket.
     *  The returned keys may be searched for to "refresh" the buckets.
     *  @return non-null, closest first
     */
    public List<T> getExploreKeys(long age) {
        List<T> rv = new ArrayList<T>(_buckets.size());
        long old = _context.clock().now() - age;
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                int curSize = b.getKeyCount();
                // Always explore the closest bucket
                if ((b.getRangeBegin() == 0) ||
                    (b.getLastChanged() < old || curSize < BUCKET_SIZE * 3 / 4))
                    rv.add(generateRandomKey(b));
            }
        } finally { releaseReadLock(); }
        return rv;
    }
    
    /**
     *  Generate a random key to go within this bucket
     *  Package private for testing only. Others shouldn't need this.
     */
    T generateRandomKey(KBucket<T> bucket) {
        int begin = bucket.getRangeBegin();
        int end = bucket.getRangeEnd();
        // number of fixed bits, out of B_VALUE - 1 bits
        int fixed = 0;
        int bsz = 1 + end - begin;
        // compute fixed = B_VALUE - log2(bsz)
        // e.g for B=4, B_FACTOR=8, sz 4-> fixed 1, sz 2->fixed 2, sz 1 -> fixed 3
        while (bsz < B_FACTOR) {
            fixed++;
            bsz <<= 1;
        }
        int fixedBits = 0;
        if (fixed > 0) {
            // 0x01, 03, 07, 0f, ...
            int mask = (1 << fixed) - 1;
            // fixed bits masked from begin
            fixedBits = (begin >> (B_VALUE - (fixed + 1))) & mask;
        }
        int obegin = begin;
        int oend = end;
        begin >>= (B_VALUE - 1);
        end >>= (B_VALUE - 1);
        // we need randomness for [0, begin) bits
        BigInteger variance;
        // 00000000rrrr
        if (begin > 0)
            variance = new BigInteger(begin - fixed, _context.random());
        else
            variance = BigInteger.ZERO;
        // we need nonzero randomness for [begin, end] bits
        int numNonZero = 1 + end - begin;
        if (numNonZero == 1) {
            // 00001000rrrr
            variance = variance.setBit(begin);
            // fixed bits as the 'main' bucket is split
            // 00001fffrrrr
            if (fixed > 0)
                variance = variance.or(BigInteger.valueOf(fixedBits).shiftLeft(begin - fixed));
        } else {
            // dont span main bucket boundaries with depth > 1
            if (fixed > 0)
                throw new IllegalStateException("??? " + bucket);
            BigInteger nonz;
            if (numNonZero <= 62) {
                // add one to ensure nonzero
                long nz = 1 + _context.random().nextLong((1l << numNonZero) - 1);
                nonz = BigInteger.valueOf(nz);
            } else {
                // loop to ensure nonzero
                do {
                    nonz = new BigInteger(numNonZero, _context.random());
                } while (nonz.equals(BigInteger.ZERO));
            }
            // shift left and or-in the nonzero randomness
            if (begin > 0)
                nonz = nonz.shiftLeft(begin);
            // 0000nnnnrrrr
            variance = variance.or(nonz);
        }

        if (_log.shouldLog(Log.DEBUG))
            _log.debug("SB(" + obegin + ',' + oend + ") KB(" + begin + ',' + end + ") fixed=" + fixed + " fixedBits=" + fixedBits + " numNonZ=" + numNonZero);
        byte data[] = variance.toByteArray();
        T key = makeKey(data);
        byte[] hash = DataHelper.xor(key.getData(), _us.getData());
        T rv = makeKey(hash);

        // DEBUG
        //int range = getRange(rv);
        //if (range < obegin || range > oend) {
        //    throw new IllegalStateException("Generate random key failed range=" + range + " for " + rv + " meant for bucket " + bucket);
        //}

        return rv;
    }
    
    /**
     *  Make a new SimpleDataStrucure from the data
     *  @param data size &lt;= SDS length, else throws IAE
     *              Can be 1 bigger if top byte is zero
     */
    @SuppressWarnings("unchecked")
    private T makeKey(byte[] data) {
        int len = _us.length();
        int dlen = data.length;
        if (dlen > len + 1 ||
            (dlen == len + 1 && data[0] != 0))
            throw new IllegalArgumentException("bad length " + dlen + " > " + len);
        T rv;
        try {
            rv = (T) _us.getClass().getDeclaredConstructor().newInstance();
        } catch (Exception e) {
            _log.error("fail", e);
            throw new RuntimeException(e);
        }
        if (dlen == len) {
            rv.setData(data);
        } else {
            byte[] ndata = new byte[len];
            if (dlen == len + 1) {
                // one bigger
                System.arraycopy(data, 1, ndata, 0, len);
            } else {
                // smaller
                System.arraycopy(data, 0, ndata, len - dlen, dlen);
            }
            rv.setData(ndata);
        }
        return rv;
    }

    private static class Range<T extends SimpleDataStructure> {
        private final int _bValue;
        private final BigInteger _bigUs;
        private final Map<T, Integer> _distanceCache;

        public Range(T us, int bValue) {
            _bValue = bValue;
            _bigUs = new BigInteger(1, us.getData());
            _distanceCache = new LHMCache<T, Integer>(256);
        }

        /** @return 0 to max-1 or -1 for us */
        public int getRange(T key) {
            Integer rv;
            synchronized (_distanceCache) {
                rv = _distanceCache.get(key);
                if (rv == null) {
                    // easy way when _bValue == 1
                    //rv = Integer.valueOf(_bigUs.xor(new BigInteger(1, key.getData())).bitLength() - 1);
                    BigInteger xor = _bigUs.xor(new BigInteger(1, key.getData()));
                    int range = xor.bitLength() - 1;
                    if (_bValue > 1) {
                        int toShift = range + 1 - _bValue;
                        int highbit = range;
                        range <<= _bValue - 1;
                        if (toShift >= 0) {
                            int extra = xor.clearBit(highbit).shiftRight(toShift).intValue();
                            range += extra;
                            //Log log = I2PAppContext.getGlobalContext().logManager().getLog(KBucketSet.class);
                            //if (log.shouldLog(Log.DEBUG))
                            //    log.debug("highbit " + highbit + " toshift " + toShift + " extra " + extra + " new " + range);
                        }
                    }
                    rv = Integer.valueOf(range);
                    _distanceCache.put(key, rv);
                }
            }
            return rv.intValue();
        }

        public void clear() {
            synchronized (_distanceCache) {
                _distanceCache.clear();
            }
        }
    }

    /**
     *  For Collections.binarySearch.
     *  getRangeBegin == getRangeEnd.
     */
    private static class DummyBucket<T extends SimpleDataStructure> implements KBucket<T> {
        private final int r;

        public DummyBucket(int range) {
            r = range;
        }

        public int getRangeBegin() { return r; }
        public int getRangeEnd() { return r; }

        public int getKeyCount() {
            return 0;
        }

        public Set<T> getEntries() {
            throw new UnsupportedOperationException();
        }

        public void getEntries(SelectionCollector<T> collector) {
            throw new UnsupportedOperationException();
        }

        public void clear() {}

        public boolean add(T peer) {
            throw new UnsupportedOperationException();
        }

        public boolean remove(T peer) {
            return false;
        }

        public void setLastChanged() {}

        public long getLastChanged() {
            return 0;
        }
    }

    /**
     *  For Collections.binarySearch.
     *  Returns equal for any overlap.
     */
    private static class BucketComparator<T extends SimpleDataStructure> implements Comparator<KBucket<T>>, Serializable {
        public int compare(KBucket<T> l, KBucket<T> r) {
            if (l.getRangeEnd() < r.getRangeBegin())
                return -1;
            if (l.getRangeBegin() > r.getRangeEnd())
                return 1;
            return 0;
        }
    }

    @Override
    public String toString() {
        StringBuilder buf = new StringBuilder(1024);
        buf.append("<div class=\"debug_container buckets\">");
        buf.append("<hr><b>Bucket set rooted on:</b> ").append(_us.toString())
           .append(" K=").append(BUCKET_SIZE)
           .append(" B=").append(B_VALUE)
           .append(" with ").append(size())
           .append(" keys in ").append(_buckets.size()).append(" buckets:<br>\n");
        getReadLock();
        try {
            int len = _buckets.size();
            for (int i = 0; i < len; i++) {
                KBucket<T> b = _buckets.get(i);
                buf.append("<b>Bucket ").append(i).append("/").append(len).append(":</b> ");
                buf.append(b.toString()).append("<br>\n");
            }
        } finally { releaseReadLock(); }
        buf.append("</div>");
        return buf.toString();
    }
}

详细笔记

KBucketSet是泛型类,属于一个容器,从名字上我们可以猜测这个类是用于存放KBucket 的set集合,之前的笔记中提到了KBucket是节点信息的容器,而这个KBucketSet是Kbucket这一容器的容器。
KBucketSet其中的KBucket在SEt中是按照抑或距离来排序的
这里我开始不明白的是set是一个集合为什么需要按早大小来排序,如果要排序为什么还选择用集合,但是实际上用的是一个list,这就说得通了

成员变量

private final Log _log;
    private final I2PAppContext _context;
    private final T _us;

    /**
     * The bucket list is locked by _bucketsLock, however the individual
     * buckets are not locked. Users may see buckets that have more than
     * the maximum k entries, or may have adds and removes silently fail
     * when they appear to succeed.
     *
     * Closest values are in bucket 0, furthest are in the last bucket.
     */
    private final List<KBucket<T>> _buckets;
    private final Range<T> _rangeCalc;
    private final KBucketTrimmer<T> _trimmer;
    
    /**
     *  Locked for reading only when traversing all the buckets.
     *  Locked for writing only when splitting a bucket.
     *  Adds/removes/gets from individual buckets are not locked.
     */
    private final ReentrantReadWriteLock _bucketsLock = new ReentrantReadWriteLock(false);

    private final int KEYSIZE_BITS;
    private final int NUM_BUCKETS;
    private final int BUCKET_SIZE;
    private final int B_VALUE;
    private final int B_FACTOR;
    
  • _log 功能存疑,应该是起记录日志的作用
  • _context 功能存疑
  • _us KBucket中的节点
  • _bucketsLock 是一个读写锁,大概作用如下,具体作用之后再进行分析

Locked for reading only when traversing all the buckets.
Locked for writing only when splitting a bucket.
Adds/removes/gets from individual buckets are not locked.

  • KEYSIZE_BITS; NUM_BUCKETS; BUCKET_SIZE 定义了三个常量,分别用来存储每个Key的比特数,BUcketr的数量,以及每个BUcket可以存放的Key的数量
  • private final int B_VALUE; private final int B_FACTOR; 这两个作用存疑,我写这里的时候没弄清楚

方法

构造函数

/**
     * Use the default trim strategy, which removes a random entry.
     * @param us the local identity (typically a SHA1Hash or Hash)
     *           The class must have a zero-argument constructor.
     * @param max the Kademlia value "k", the max per bucket, k &gt;= 4
     * @param b the Kademlia value "b", split buckets an extra 2**(b-1) times,
     *           b &gt; 0, use 1 for bittorrent, Kademlia paper recommends 5
     */
    public KBucketSet(I2PAppContext context, T us, int max, int b) {
        this(context, us, max, b, new RandomTrimmer<T>(context, max));
    }

    /**
     * Use the supplied trim strategy.
     */
    public KBucketSet(I2PAppContext context, T us, int max, int b, KBucketTrimmer<T> trimmer) {
        _us = us;
        _context = context;
        _log = context.logManager().getLog(KBucketSet.class);
        _trimmer = trimmer;
        if (max <= 4 || b <= 0 || b > 8)
            throw new IllegalArgumentException();
        KEYSIZE_BITS = us.length() * 8;
        B_VALUE = b;
        B_FACTOR = 1 << (b - 1);
        NUM_BUCKETS = KEYSIZE_BITS * B_FACTOR;
        BUCKET_SIZE = max;
        _buckets = createBuckets();
        _rangeCalc = new Range<T>(us, B_VALUE);
        // this verifies the zero-argument constructor
        makeKey(new byte[us.length()]);
    }
    

首先对一些变量进行说明 us是本地实体的id,就是我们在kademila算法中提到的noeid,这个nodeid一般来讲是对node的hash值
max是每个bucket的最大能够存放的key元素的数量
b的值是什么还是有点疑惑

这里一共又两个构造函数,其中第一个是当参数trimmer没有给出的时候的默认的构造方式。

  • 几个变量的赋值,其中log到底发生了什么事我们之后再看
  • 其中log我们现在不知道什么远离,之后再看
  • 进行了一些参数的合法性检查,max必须大于4也就是说每个bucket存放的必须大于4,b必须大于0喜爱与等于8
  • B_VALUE是b,B_FACTOR是2的b-1次方,这就是所说的split bucket 次数
  • KEYSIZ_BITS是key的比特数,直接.lenght*8得到位数
  • 不明白的是为什么NUM_BUCKET = KEYSIZE_BITS * B_FACTOR,在算法中我们是又多少位就有多少bucket,不知道这里为什么zaiKEISIZE的基础傻姑娘还得乘上个B_FACTOR
  • range是一个接口,范型接口,用来保存范围,不知道咋用的,存疑

在创建_bucket 这个用来存放kbucket的bucket的时候,用到了函数createBuckets,看一下这个函数,两个重载,针对有无参数

private List<KBucket<T>> createBuckets() {
        // just an initial size
        List<KBucket<T>> buckets = new ArrayList<KBucket<T>>(4 * B_FACTOR);
        buckets.add(createBucket(0, NUM_BUCKETS -1));
        return buckets;
    }
    
    private KBucket<T> createBucket(int start, int end) {
        if (end - start >= B_FACTOR &&
            (((end + 1) & B_FACTOR - 1) != 0 ||
             (start & B_FACTOR - 1) != 0))
            throw new IllegalArgumentException("Sub-bkt crosses K-bkt boundary: " + start + '-' + end);
        KBucket<T> bucket = new KBucketImpl<T>(_context, start, end, BUCKET_SIZE, _trimmer);
        return bucket;
    }

在缺省的时候,list<KBucket<T>> buckets= new ArrayList<KBucket<T>>($*B_FACTOR)
通过arraylist来创建buckets,然后调用createBucket函数,这里有一个错误是之前我认为这里是两个重载函数,createBucket,但是后来发现命名是不一样的,并且在createBuckets中调用来createBucket函数,所以知道createBuckets是创建bucket容器的容器的函数,而createBucket是创建bucket容器的函数。这一点要区分清楚

在createBuckets()函数中首先创建一个ayyaylist,然后add一个KBucket,使用createBucket来创造

在createBuckdet()函数中首先进行了合法性的验证,然后调用KBucketImpl的构造函数来创建

其他函数

获取lock相关函数

  private void getReadLock() {
        _bucketsLock.readLock().lock();
    }

    /**
     *  Get the lock if we can. Non-blocking.
     *  @return true if the lock was acquired
     */
    private boolean tryReadLock() {
        return _bucketsLock.readLock().tryLock();
    }

    private void releaseReadLock() {
        _bucketsLock.readLock().unlock();
    }

    /** @return true if the lock was acquired */
    private boolean getWriteLock() {
        try {
            boolean rv = _bucketsLock.writeLock().tryLock(3000, TimeUnit.MILLISECONDS);
            if ((!rv) && _log.shouldLog(Log.WARN))
                _log.warn("no lock, size is: " + _bucketsLock.getQueueLength(), new Exception("rats"));
            return rv;
        } catch (InterruptedException ie) {}
        return false;
    }

    private void releaseWriteLock() {
        _bucketsLock.writeLock().unlock();
    }

关于锁的具体的用法这里先不记录因为不知道,先看看是怎么实现的

  • 读锁 先tryReadLock来看一下是否有机会获取读锁,如果可以的返回true

  • 写锁 这里不知道为什么这么设计 首先尝试获取lock,如果可以,返回true,如果不可以,并且超时,那么返回false并且在log中记录异常,

  • readlock 和 writelock 的unlock的解锁

添加节点

 /**
     * @return true if the peer is new to the bucket it goes in, or false if it was
     *  already in it. Always returns false on an attempt to add ourselves.
     *
     */
    public boolean add(T peer) {
        KBucket<T> bucket;
        getReadLock();
        try {
            bucket = getBucket(peer);
        } finally { releaseReadLock(); }
        if (bucket != null) {
            if (bucket.add(peer)) {
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug("Peer " + peer + " added to bucket " + bucket);
                if (shouldSplit(bucket)) {
                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug("Splitting bucket " + bucket);
                    split(bucket.getRangeBegin());
                    //testAudit(this, _log);
                }
                return true;
            } else {
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug("Peer " + peer + " NOT added to bucket " + bucket);
                return false;
            }
        } else {
            if (_log.shouldLog(Log.WARN))
                _log.warn("Failed to add, probably us: " + peer);
            return false;
        }
    }

看参数是T peer,结合类型和命名我们可以知道这个是添加节点的函数,首先创建了一个KBucket,然后后去读锁,在这个文件的开头的时候就说了,在遍历的时候我们需要获取读锁。然后让bucket的值等于getBucket(peer),这个getBucket应该是获取 peer(我们想要插入的元素)是否在bucket当中,返回值应该是引用 先看一下getBucket这个函数大奥迪干啥的。

private KBucket<T> getBucket(T key) {
       int bucket = pickBucket(key);
       if (bucket < 0)
           return null;
       return _buckets.get(bucket);
    }

getBucket()函数接受的参数类型是T,也就是一个个节点或者我们称为节点信息,对于作为参数的节点信息,调用pockBucket函数,返回值是一个int,这个返回值应该是编号或者说在list中的位置(猜测)。我们再看一下pickbucket函数



    /**
     *  The bucket number (NOT the range number) that the xor of the key goes in
     *  Caller must hold read lock
     *  @return 0 to max-1 or -1 for us
     */
    private int pickBucket(T key) {
        int range = getRange(key);
        if (range < 0)
            return -1;
        int rv = pickBucket(range);
        if (rv >= 0) {
             return rv;
        }
        _log.error("Key does not fit in any bucket?!\nKey  : [" 
                   + DataHelper.toHexString(key.getData()) + "]" 
                   + "\nUs   : " + _us
                   + "\nDelta: ["
                   + DataHelper.toHexString(DataHelper.xor(_us.getData(), key.getData()))
                   + "]", new Exception("???"));
        _log.error(toString());
        throw new IllegalStateException("pickBucket returned " + rv);
        //return -1;
    }
/**
     *  The bucket number that contains this range number
     *  Caller must hold read lock or write lock
     *  @return 0 to max-1 or -1 for us
     */
    private int pickBucket(int range) {
        // If B is small, a linear search from back to front
        // is most efficient since most of the keys are at the end...
        // If B is larger, there's a lot of sub-buckets
        // of equal size to be checked so a binary search is better
        if (B_VALUE <= 3) {
            for (int i = _buckets.size() - 1; i >= 0; i--) {
                KBucket<T> b = _buckets.get(i);
                if (range >= b.getRangeBegin() && range <= b.getRangeEnd())
                    return i;
            }
            return -1;
        } else {
            KBucket<T> dummy = new DummyBucket<T>(range);
            return Collections.binarySearch(_buckets, dummy, new BucketComparator<T>());
        }
    }

看一下上边的两个重载的代码,pickBucket,一边接受T key也就是节点一边接受int range。但其实我觉得有时候没必要要用重载,因为在这里是先将key转换成了range然后再调用那个重载的函数。
先看一个接受key的。首先将key来getrange(),我们看一下getrange这个函数,就是个一个range类的对象的getrange函数,这个range类之后会仔细记录下来,具体的到时候再说
getrange(key)之后返回一个int值,根据这个int值,首先做个合法性检验,然后调用第二个pickBucket,
pickBucket就是根据range来返回第几个序列,在这里要注意的是,当B的值比较小的时候使用线性查找,当B的值比较大,就有很多的bucket,那么就需要使用二分查找。
所以在B小于3的时候,就是用线性查找,从_buckets(存放bucket的list容器,就是最外边那个),从后即_buckets.size()-1开始往0开始找,对于每个元素getrange来对比range,一直到找到正确的序号,而如果B大于等于4的时候,就创建一个dummybyucket类的dummy对象,然后调用colleciton的binarysearch,具体的实现原理之后再看,这里只是对代码的结构现有一个大体的了解。得到序号之后,直接使用_bucket自带的get函数,因为它是list,所以自带get就函数,得到bucket

现在我们返回在KBucketSet.java文件中的add函数,我们是在add函数中,在其中添加一个节点,然后为了添加这个节点到哪个位置(就是到那个bucket上),调用了getBUcket函数,找到了bucket,然后对这个bucket调用add函数,这个add函数跟在这个文件中的add函数不一样,是KBucketImpl.java的add函数,
这个add函数是这样的

    /**
     *  Sets last-changed if rv is true OR if the peer is already present.
     *  Calls the trimmer if begin == end and we are full.
     *  If begin != end then add it and caller must do bucket splitting.
     *  @return true if added
     */
    public boolean add(T peer) {
        if (_begin != _end || _entries.size() < _max ||
            _entries.contains(peer) || _trimmer.trim(this, peer)) {
            // do this even if already contains, to call setLastChanged()
            boolean rv = _entries.add(peer);
            setLastChanged();
            return rv;
        }
        return false;
    }

回到KBucketSet,java中的add函数,
在try中尝试获得getBcuket,然后释放readlock。如果bucket不为空,尝试在这个bucket中添加,如果添加成功,判断是否该记录,如果有记录,然后判断是否该split。我们先看一下shouldsplit和slit两个函数看看如何判断是否该split以及如何split

/**
     *  No lock required.
     *  FIXME will split the closest buckets too far if B &gt; 1 and K &lt; 2**B
     *  Won't ever really happen and if it does it still works.
     */
    private boolean shouldSplit(KBucket<T> b) {
        return
               b.getRangeBegin() != b.getRangeEnd() &&
               b.getKeyCount() > BUCKET_SIZE;
    }

    /**
     *  Grabs the write lock.
     *  Caller must NOT have the read lock.
     *  The bucket should be splittable (range start != range end).
     *  @param r the range start of the bucket to be split
     */
    private void split(int r) {
        if (!getWriteLock())
            return;
        try {
            locked_split(r);
        } finally { releaseWriteLock(); }
    }

    /**
     *  Creates two or more new buckets. The old bucket is replaced and discarded.
     *
     *  Caller must hold write lock
     *  The bucket should be splittable (range start != range end).
     *  @param r the range start of the bucket to be split
     */
    private void locked_split(int r) {
        int b = pickBucket(r);
        while (shouldSplit(_buckets.get(b))) {
            KBucket<T> b0 = _buckets.get(b);
            // Each bucket gets half the keyspace.
            // When B_VALUE = 1, or the bucket is larger than B_FACTOR, then
            // e.g. 0-159 => 0-158, 159-159
            // When B_VALUE > 1, and the bucket is smaller than B_FACTOR, then
            // e.g. 1020-1023 => 1020-1021, 1022-1023
            int s1, e1, s2, e2;
            s1 = b0.getRangeBegin();
            e2 = b0.getRangeEnd();
            if (B_VALUE == 1 ||
                ((s1 & (B_FACTOR - 1)) == 0 &&
                 ((e2 + 1) & (B_FACTOR - 1)) == 0 &&
                 e2 > s1 + B_FACTOR)) {
                // The bucket is a "whole" kbucket with a range > B_FACTOR,
                // so it should be split into two "whole" kbuckets each with
                // a range >= B_FACTOR.
                // Log split
                s2 = e2 + 1 - B_FACTOR;
            } else {
                // The bucket is the smallest "whole" kbucket with a range == B_FACTOR,
                // or B_VALUE > 1 and the bucket has already been split.
                // Start or continue splitting down to a depth B_VALUE.
                // Linear split
                s2 = s1 + ((1 + e2 - s1) / 2);
            }	
            e1 = s2 - 1;
            if (_log.shouldLog(Log.INFO))
                _log.info("Splitting (" + s1 + ',' + e2 + ") -> (" + s1 + ',' + e1 + ") (" + s2 + ',' + e2 + ')');
            KBucket<T> b1 = createBucket(s1, e1);
            KBucket<T> b2 = createBucket(s2, e2);
            for (T key : b0.getEntries()) {
                if (getRange(key) < s2)
                    b1.add(key);
                else
                    b2.add(key);
            }
            _buckets.set(b, b1);
            _buckets.add(b + 1, b2);
            if (_log.shouldLog(Log.DEBUG))
                _log.debug("Split bucket at idx " + b +
                           ":\n" + b0 +
                           "\ninto: " + b1 +
                           "\nand: " + b2);
            //if (_log.shouldLog(Log.DEBUG))
            //    _log.debug("State is now: " + toString());

            if (b2.getKeyCount() > BUCKET_SIZE) {
                // should be rare... too hard to call _trimmer from here
                // (and definitely not from inside the write lock)
                if (_log.shouldLog(Log.INFO))
                    _log.info("All went into 2nd bucket after split");
            }
            // loop if all the entries went in the first bucket
        }
    }

    /**
     *  The current number of entries.
     */

在什么样的情况下需要split呢?首先是getrangebegin 不等于getrangeend,并且getKeyCount 》 Bucket_SIZE ,那么需要split
然后我们看一下split’函数,首先需要获取writelock,写锁,然后调用locked_split,传入的参数是需要split的bucket的begin的数
再看一下locked_split,首先根据传入的begin数来pickBucket,找到Bucket的序号。然后在一个while循环中,while(shouldSplit())意思是就只要是shouldSplit返回值为true,就继续循环,

补充:其实在之前一直没弄明白split到底是什么意思以及为什么要split,仔细的看了注释才明白,因为一开始的Bucket不是全的,可能只有一个bucket,所以当节点U得知新的联系人Cd饿时候,节点U根据C的nodeid需要将c插入到适当的bucket中,如果这个bucket已经保存了了k个节点(getkeycount > BUCKETSIze,并且他的范围跨过了U这个节点的话,那么就需要split来分开bucket,一直达到该有的数量)

上边说到,在一个循环中,知道shouldSplit返回值为false,也就是无需再Split了就停止循环。接下来再看在循环中到底做了什么

首先根据pickBucket得到的序列号得到KBucket。命名为b0,就是bucket0 的意思,然后设定了几个整型,分别为s1,s2,e1,e2。意思是什么呢。我们 split bucket 的操作是将原本的bucket给分开变成两个bucket。所以s1,s2,e1,e2的含义就是 start1,start2,end1,end2,分别是第一个bucket的satrt和end以及第二个bucket 的start和end。

然后将s1 = b0.getRangeBegin();e2 = b0.getRangeEnd();,将第一个bucket的start范围值赋值为原来b0的satrt,让第二个bucket的end值为原来b0的end,这样第一个bucket和第二个bucket是从左往右数的

split不管怎么杨,都是一个分成两个,然后原先的那个就不用了,但是具体的soplit的方式,针对应用的要求,给出的参数不同,split的方式也是不同的。

  • 如果B_VALUE == 1,或者bucket 是比B_FATOR大,就令 s2 = e2+1-B_FACTOR
  • 否则令s2 = s1 + (1+e2-s1)/2)

至于为什么这么分,我也不知道,差了几次资料没有理解的,等去看一下白皮书

继续往下看,根绝参数不同确定好s1,s2,e1,e2之后,调用createBucket函数来创建bucket,然后对bo中的每个元素,看他现在应该在哪一个bucket中,分别添加到对应的应该的bucket当中去,这样之后,调用set和add函数将原本的bucket0变成bucket1,然后在bucket1的位置之后添加到bucket2.

之后就是一些需要记录的log的操作。

总结:在这个KBUcketSET中,如果想要添加一个元素,那么首先获取锁,然后找到那个bucket,然后在这个bucket中添加这个key,然后根据添加后的bucket是否需要split,就这养一个逻辑

其他函数

   /**
     *  The current number of entries.
     */
    public int size() {
        int rv = 0;
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                rv += b.getKeyCount();
            }
        } finally { releaseReadLock(); }
        return rv;
    }
    
    public boolean remove(T entry) {
        KBucket<T> kbucket;
        getReadLock();
        try {
            kbucket = getBucket(entry);
        } finally { releaseReadLock(); }
        if (kbucket == null)  // us
            return false;
        boolean removed = kbucket.remove(entry);
        return removed;
    }
    
    /** @since 0.8.8 */
    public void clear() {
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                b.clear();
            }
        } finally { releaseReadLock(); }
        _rangeCalc.clear();
    }
    
    /**
     *  @return a copy in a new set
     */
    public Set<T> getAll() {
        Set<T> all = new HashSet<T>(256);
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                all.addAll(b.getEntries());
            }
        } finally { releaseReadLock(); }
        return all;
    }

    /**
     *  @return a copy in a new set
     */
    public Set<T> getAll(Set<T> toIgnore) {
        Set<T> all = getAll();
        all.removeAll(toIgnore);
        return all;
    }
    
    public void getAll(SelectionCollector<T> collector) {
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                b.getEntries(collector);
            }
        } finally { releaseReadLock(); }
    }
    
    /**
     *  The keys closest to us.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(int max) {
        return getClosest(max, Collections.<T> emptySet());
    }
    
    /**
     *  The keys closest to us.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(int max, Collection<T> toIgnore) {
        List<T> rv = new ArrayList<T>(max);
        int count = 0;
        getReadLock();
        try {
            // start at first (closest) bucket
            for (int i = 0; i < _buckets.size() && count < max; i++) {
                Set<T> entries = _buckets.get(i).getEntries();
                // add the whole bucket except for ignores,
                // extras will be trimmed after sorting
                for (T e : entries) {
                    if (!toIgnore.contains(e)) {
                        rv.add(e);
                        count++;
                    }
                }
            }
        } finally { releaseReadLock(); }
        Comparator<T> comp = new XORComparator<T>(_us);
        Collections.sort(rv, comp);
        int sz = rv.size();
        for (int i = sz - 1; i >= max; i--) {
            rv.remove(i);
        }
        return rv;
    }
    
    /**
     *  The keys closest to the key.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(T key, int max) {
        return getClosest(key, max, Collections.<T> emptySet());
    }
    
    /**
     *  The keys closest to the key.
     *  Returned list will never contain us.
     *  @return non-null, closest first
     */
    public List<T> getClosest(T key, int max, Collection<T> toIgnore) {
        if (key.equals(_us))
            return getClosest(max, toIgnore);
        List<T> rv = new ArrayList<T>(max);
        int count = 0;
        getReadLock();
        try {
            int start = pickBucket(key);
            // start at closest bucket, then to the smaller (closer to us) buckets
            for (int i = start; i >= 0 && count < max; i--) {
                Set<T> entries = _buckets.get(i).getEntries();
                for (T e : entries) {
                    if (!toIgnore.contains(e)) {
                        rv.add(e);
                        count++;
                    }
                }
            }
            // then the farther from us buckets if necessary
            for (int i = start + 1; i < _buckets.size() && count < max; i++) {
                Set<T> entries = _buckets.get(i).getEntries();
                for (T e : entries) {
                    if (!toIgnore.contains(e)) {
                        rv.add(e);
                        count++;
                    }
                }
            }
        } finally { releaseReadLock(); }
        Comparator<T> comp = new XORComparator<T>(key);
        Collections.sort(rv, comp);
        int sz = rv.size();
        for (int i = sz - 1; i >= max; i--) {
            rv.remove(i);
        }
        return rv;
    }

  • size() 这个函数是获取KBucketSet中的所有的端点的数量
  • remove() 参数是T类型,就是节点的类型,首先pickBucket找到相应的BUcket然后调用KBucket类的函数remove来删除节点,函数的返回值是布尔类型
  • clear 清楚所有的节点,无返回值
  • getAll
    • 无参数 复制一个新的set来返回
    • 接受SET<T>参数 复制所有的节点,并且remove到其中的某一个set来返回
    • 利用collector来收集
  • getclosest 寻找到最近的节点
    • getclosest(int max)缺省参数,添加默认参数
    • getclosest(int max,collections.<T> emptySet())返回值是一个list,根据传入的饿参数max来确定最大的list中元素的数量,声明变量 int conunt,来记录数量,获取读锁,因为需要遍历。根据序号,获取_buckets中所有的bucket,在保证count<max
      的前提下,并且bucket不再 ingnore的范围内,都添加到rv当中去,然后利用XORComparator来排序,再remove掉远端的节点一直到大小合适了,然后返回这个list
    • getClosest(T key, int max) 包含距离某个节点的最近的节点,缺省toIgnore,自动补全为空集合,调用下一个重载的函数
    • public List<T> getClosest(T key, int max, Collection<T> toIgnore)首先如果这个想要找到最近节点的目标节点就是本节点,那么直接调用原来的第二个重载函数就可以了,如果不是本节点,那么前几步骤跟之前一样,返回值是一个list,根据传入的饿参数max来确定最大的list中元素的数量,声明变量 int conunt,来记录数量,获取读锁,因为需要遍历。然后根据给到的key来获得bucket的序列号,将这个序列号开始,作为i,然后从双向来往外扩添加bucket,优先向下,向下不够来再向上,具体的一看代码就明白

不知道怎么命名的另外一坨参数


   
    /**
     *  For every bucket that hasn't been updated in this long,
     *  or isn't close to full,
     *  generate a random key that would be a member of that bucket.
     *  The returned keys may be searched for to "refresh" the buckets.
     *  @return non-null, closest first
     */
    public List<T> getExploreKeys(long age) {
        List<T> rv = new ArrayList<T>(_buckets.size());
        long old = _context.clock().now() - age;
        getReadLock();
        try {
            for (KBucket<T> b : _buckets) {
                int curSize = b.getKeyCount();
                // Always explore the closest bucket
                if ((b.getRangeBegin() == 0) ||
                    (b.getLastChanged() < old || curSize < BUCKET_SIZE * 3 / 4))
                    rv.add(generateRandomKey(b));
            }
        } finally { releaseReadLock(); }
        return rv;
    }
    
    /**
     *  Generate a random key to go within this bucket
     *  Package private for testing only. Others shouldn't need this.
     */
    T generateRandomKey(KBucket<T> bucket) {
        int begin = bucket.getRangeBegin();
        int end = bucket.getRangeEnd();
        // number of fixed bits, out of B_VALUE - 1 bits
        int fixed = 0;
        int bsz = 1 + end - begin;
        // compute fixed = B_VALUE - log2(bsz)
        // e.g for B=4, B_FACTOR=8, sz 4-> fixed 1, sz 2->fixed 2, sz 1 -> fixed 3
        while (bsz < B_FACTOR) {
            fixed++;
            bsz <<= 1;
        }
        int fixedBits = 0;
        if (fixed > 0) {
            // 0x01, 03, 07, 0f, ...
            int mask = (1 << fixed) - 1;
            // fixed bits masked from begin
            fixedBits = (begin >> (B_VALUE - (fixed + 1))) & mask;
        }
        int obegin = begin;
        int oend = end;
        begin >>= (B_VALUE - 1);
        end >>= (B_VALUE - 1);
        // we need randomness for [0, begin) bits
        BigInteger variance;
        // 00000000rrrr
        if (begin > 0)
            variance = new BigInteger(begin - fixed, _context.random());
        else
            variance = BigInteger.ZERO;
        // we need nonzero randomness for [begin, end] bits
        int numNonZero = 1 + end - begin;
        if (numNonZero == 1) {
            // 00001000rrrr
            variance = variance.setBit(begin);
            // fixed bits as the 'main' bucket is split
            // 00001fffrrrr
            if (fixed > 0)
                variance = variance.or(BigInteger.valueOf(fixedBits).shiftLeft(begin - fixed));
        } else {
            // dont span main bucket boundaries with depth > 1
            if (fixed > 0)
                throw new IllegalStateException("??? " + bucket);
            BigInteger nonz;
            if (numNonZero <= 62) {
                // add one to ensure nonzero
                long nz = 1 + _context.random().nextLong((1l << numNonZero) - 1);
                nonz = BigInteger.valueOf(nz);
            } else {
                // loop to ensure nonzero
                do {
                    nonz = new BigInteger(numNonZero, _context.random());
                } while (nonz.equals(BigInteger.ZERO));
            }
            // shift left and or-in the nonzero randomness
            if (begin > 0)
                nonz = nonz.shiftLeft(begin);
            // 0000nnnnrrrr
            variance = variance.or(nonz);
        }

        if (_log.shouldLog(Log.DEBUG))
            _log.debug("SB(" + obegin + ',' + oend + ") KB(" + begin + ',' + end + ") fixed=" + fixed + " fixedBits=" + fixedBits + " numNonZ=" + numNonZero);
        byte data[] = variance.toByteArray();
        T key = makeKey(data);
        byte[] hash = DataHelper.xor(key.getData(), _us.getData());
        T rv = makeKey(hash);

        // DEBUG
        //int range = getRange(rv);
        //if (range < obegin || range > oend) {
        //    throw new IllegalStateException("Generate random key failed range=" + range + " for " + rv + " meant for bucket " + bucket);
        //}

        return rv;
    }
    
    /**
     *  Make a new SimpleDataStrucure from the data
     *  @param data size &lt;= SDS length, else throws IAE
     *              Can be 1 bigger if top byte is zero
     */
    @SuppressWarnings("unchecked")
    private T makeKey(byte[] data) {
        int len = _us.length();
        int dlen = data.length;
        if (dlen > len + 1 ||
            (dlen == len + 1 && data[0] != 0))
            throw new IllegalArgumentException("bad length " + dlen + " > " + len);
        T rv;
        try {
            rv = (T) _us.getClass().getDeclaredConstructor().newInstance();
        } catch (Exception e) {
            _log.error("fail", e);
            throw new RuntimeException(e);
        }
        if (dlen == len) {
            rv.setData(data);
        } else {
            byte[] ndata = new byte[len];
            if (dlen == len + 1) {
                // one bigger
                System.arraycopy(data, 1, ndata, 0, len);
            } else {
                // smaller
                System.arraycopy(data, 0, ndata, len - dlen, dlen);
            }
            rv.setData(ndata);
        }
        return rv;
    }

    private static class Range<T extends SimpleDataStructure> {
        private final int _bValue;
        private final BigInteger _bigUs;
        private final Map<T, Integer> _distanceCache;

        public Range(T us, int bValue) {
            _bValue = bValue;
            _bigUs = new BigInteger(1, us.getData());
            _distanceCache = new LHMCache<T, Integer>(256);
        }

        /** @return 0 to max-1 or -1 for us */
        public int getRange(T key) {
            Integer rv;
            synchronized (_distanceCache) {
                rv = _distanceCache.get(key);
                if (rv == null) {
                    // easy way when _bValue == 1
                    //rv = Integer.valueOf(_bigUs.xor(new BigInteger(1, key.getData())).bitLength() - 1);
                    BigInteger xor = _bigUs.xor(new BigInteger(1, key.getData()));
                    int range = xor.bitLength() - 1;
                    if (_bValue > 1) {
                        int toShift = range + 1 - _bValue;
                        int highbit = range;
                        range <<= _bValue - 1;
                        if (toShift >= 0) {
                            int extra = xor.clearBit(highbit).shiftRight(toShift).intValue();
                            range += extra;
                            //Log log = I2PAppContext.getGlobalContext().logManager().getLog(KBucketSet.class);
                            //if (log.shouldLog(Log.DEBUG))
                            //    log.debug("highbit " + highbit + " toshift " + toShift + " extra " + extra + " new " + range);
                        }
                    }
                    rv = Integer.valueOf(range);
                    _distanceCache.put(key, rv);
                }
            }
            return rv.intValue();
        }

        public void clear() {
            synchronized (_distanceCache) {
                _distanceCache.clear();
            }
        }
    }

    /**
     *  For Collections.binarySearch.
     *  getRangeBegin == getRangeEnd.
     */
    private static class DummyBucket<T extends SimpleDataStructure> implements KBucket<T> {
        private final int r;

        public DummyBucket(int range) {
            r = range;
        }

        public int getRangeBegin() { return r; }
        public int getRangeEnd() { return r; }

        public int getKeyCount() {
            return 0;
        }

        public Set<T> getEntries() {
            throw new UnsupportedOperationException();
        }

        public void getEntries(SelectionCollector<T> collector) {
            throw new UnsupportedOperationException();
        }

        public void clear() {}

        public boolean add(T peer) {
            throw new UnsupportedOperationException();
        }

        public boolean remove(T peer) {
            return false;
        }

        public void setLastChanged() {}

        public long getLastChanged() {
            return 0;
        }
    }

    /**
     *  For Collections.binarySearch.
     *  Returns equal for any overlap.
     */
    private static class BucketComparator<T extends SimpleDataStructure> implements Comparator<KBucket<T>>, Serializable {
        public int compare(KBucket<T> l, KBucket<T> r) {
            if (l.getRangeEnd() < r.getRangeBegin())
                return -1;
            if (l.getRangeBegin() > r.getRangeEnd())
                return 1;
            return 0;
        }
    }

    @Override
    public String toString() {
        StringBuilder buf = new StringBuilder(1024);
        buf.append("<div class=\"debug_container buckets\">");
        buf.append("<hr><b>Bucket set rooted on:</b> ").append(_us.toString())
           .append(" K=").append(BUCKET_SIZE)
           .append(" B=").append(B_VALUE)
           .append(" with ").append(size())
           .append(" keys in ").append(_buckets.size()).append(" buckets:<br>\n");
        getReadLock();
        try {
            int len = _buckets.size();
            for (int i = 0; i < len; i++) {
                KBucket<T> b = _buckets.get(i);
                buf.append("<b>Bucket ").append(i).append("/").append(len).append(":</b> ");
                buf.append(b.toString()).append("<br>\n");
            }
        } finally { releaseReadLock(); }
        buf.append("</div>");
        return buf.toString();
    }
}

  • getExploreKeys 对于指定时间没有更新的bucket或者是远没有满了的bucket,生成一个随机的key,作为这个bucket的成员,创建一个arraylist rv 用来做返回值,,然后使用long类型的用来存储现在的时间减去应该更新的时间间隔,这样 这个long类型的old变量就存储着上一次应该更新的时间,然后获取读取锁,用来遍历,然后对于_bucket中的每一个bucket,计算其中的key的数量,如果rangebegin ==0 不知道为什么如果wei0就需要更新或者上次更新的时候比应该更新的时候要早,也就是说应该更新的时候没哟更新,或者说是bucket中的eky的数量小于 3/4 bucketsize,那么就生成随机的key,向rv这个list中添加 generateRandomKey(b)的返回值
  • T generateRandomKey(KBYcjet<T> b)生成随机的key,放到相应的bucket当中,首先获取bucket的begin和end,然后声明一个新的变量fixed,新的int bsz,bsz = 1+end-begin,从这里的计算来看是用来保存bucket中的可以存在的理论上的最大的 key 的 数量,然后来计算fixed,通过 fixed = B_VALUE -log2(bsz),如果对于一个set,b_value = 4,然后b_factor = 8,如果sz为4,那么fixed的值就是1,如果sz的值为2,那么fixed的值就是2。之后声明一个新的变量fixedbites为0
    其实在这个过程中很疑问为什么需要用这种方式,这样做的原因是为什么,其实本身就是随机构建的,所以这是一个随机构建的方式,这样做的原因可能是够随机,或者说是够安全,或者是够分散,知道怎么做就行,具体为什么,之后再看论文吧
    继续往下,如果fixed 》0 的时候,创建 int类型mask = 2 ** fixed 次方 再 -1 这样得到mask。
    然后在使用mask来计算fixedBits,fixedBits = (begin >> (B_VALUE-(fixed +1))) & mask,右移运算符是将数向右移动,超出来的直接丢弃,左边的补0,用语言来描述是,九江begin向右移动B_VALUE-fixed-1的位,然后进行位操作并来mask。

这样做之后,重新声明 obegin oend,让他们的值等于begin和end的值,将begin和end分别右移 B_VALUE位,java提供bigInteger类型,用来进行大数的操作,如果begin >0(因为之前begin右移了),就让variance = new BigInteger(begin - fixed, _context.random());,但是关于_context.random的变量的操作,应该是生成某些random的字符的,但具体是怎么施行的就之后再说吧。之后还有许多操作,但是大多都是位运算以及简单的逻辑判断,所以还有打段的就不提了,

通过上边的操作,获得了一个byte数组 data,然后使用makeKey函数,得到一个T 也就是 bucket的元素key类型的变量,先看一下makeKey的函数

 
    /**
     *  Make a new SimpleDataStrucure from the data
     *  @param data size &lt;= SDS length, else throws IAE
     *              Can be 1 bigger if top byte is zero
     */
    @SuppressWarnings("unchecked")
    private T makeKey(byte[] data) {
        int len = _us.length();
        int dlen = data.length;
        if (dlen > len + 1 ||
            (dlen == len + 1 && data[0] != 0))
            throw new IllegalArgumentException("bad length " + dlen + " > " + len);
        T rv;
        try {
            rv = (T) _us.getClass().getDeclaredConstructor().newInstance();
        } catch (Exception e) {
            _log.error("fail", e);
            throw new RuntimeException(e);
        }
        if (dlen == len) {
            rv.setData(data);
        } else {
            byte[] ndata = new byte[len];
            if (dlen == len + 1) {
                // one bigger
                System.arraycopy(data, 1, ndata, 0, len);
            } else {
                // smaller
                System.arraycopy(data, 0, ndata, len - dlen, dlen);
            }
            rv.setData(ndata);
        }
        return rv;
    }
  • makeKey函数接受输入的一个byte数组,返回一个T类型的返回值。首先 ,声明一个int类型的命名为len,为us也就是本节点的长度。然后生命一个int类型的dlen变量,用来保存data的length,加上一个合法性检查,如果通过之后,创建一个t的变量rv,然后尝试初始化,如果dlen== len,那么rv直接setdata,如鬼哦是dlen == len+1或者len>dlen,舍弃掉其他的多余的,补上0这一系列操作,然后setdata。

range类和dummybucket类

range 类

成员变量和构造函数

三个成员变量,分别是_bValue,_bigUS用来大数操作,以及_distanceCache,其中_distanceCache是一个Map类,<T,Integer>
其中_distanceCache用来存储distance的cache,用LHMCache来存储,这个LHMCache之后再看,再i2p.util中有一个LHMCache类

成员函数

  • int getRange(T key) 返回值int ,接受T类型的参数,使用synchronized串行化,首先在_distanceCache中寻找key的值,如果没有遭到,那么说明缓存中没有,就需要计算,。首先_bigUs是通过new BigInteget(int,byte[])类型的变量返回的是一个数值是byte[ ]大小的,正负值是int 的类型的变量。
    然后刚才说到缓存中没有缓存到这个节点的range,那么就通过自己计算的方法,声明一个BigInteger的变量xor,看名字就知道是用来计算一伙的,用_bigUs(就是本节点)与想要求range节点来进行异或,声明一个range = xor.bitlength()。这里需要注意,这里的range并不是我们需要的range,他只是代表了我们得到的ebigLength的长度,至于为什么要-1,是因为bitlength这个函数返回的长度包括正负标志位。然后进行判断,如果bValue>1 这里我不知道bValue的值的会影响什么,所以这个操作我也看不明白…但是如果不是bValue>1,那么就放到cache中,返回相应的值。

dummy类

这个类用来进行binary search的

BucketComparator类

用来比较范围的,对于两个T的变量l r,如果l的end 小于 r的start,那么返回-1,如过l的begin大于r的end,那么返回1
否则返回0

 类似资料: