tinkerpop Step算子的框架结构

柯正谊
2023-12-01

最顶层接口 Step:

// org.apache.tinkerpop.gremlin.process.traversal.Step
public interface Step<S, E> {
    
    public void addStarts(final Iterator<Traverser.Admin<S>> starts);
    public void addStart(final Traverser.Admin<S> start);

    public void setPreviousStep(final Step<?, S> step);
    public Step<?, S> getPreviousStep();
    
    public void setNextStep(final Step<E, ?> step);
    public Step<E, ?> getNextStep();

    public <A, B> Traversal.Admin<A, B> getTraversal();
    public void setTraversal(final Traversal.Admin<?, ?> traversal);
    
    public void addLabel(final String label);
}

Step 继承了 Iterator 接口,表示可以从 Step 上获取查询结果:

Iterator<Traverser.Admin<E>>

AbstractStep 是 Step 的抽象实现类,基本上所有的实现算子均继承自 AbstractStep,AbstractStep.next() 方法实现如下:

// org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next()
public abstract class AbstractStep<S, E> implements Step<S, E> {
    
    @Override
    public Traverser.Admin<E> next() {
        if (null != this.nextEnd) {
            try {
                // 返回结果,this.nextEnd一般是上一次hasNext()调用时保存下来的结果,prepareTraversalForNextStep会设置labels和stepId
                return this.prepareTraversalForNextStep(this.nextEnd);
            } finally {
                this.nextEnd = null;
            }
        } else {
            while (true) {
                if (Thread.interrupted()) throw new TraversalInterruptedException();
                // 重点方法,本质上每次获取下一个元素是调用processNextStart方法(方法名称比较奇怪),该方法在各个子类中被重载
                final Traverser.Admin<E> traverser = this.processNextStart();
                if (null != traverser.get() && 0 != traverser.bulk())
                    return this.prepareTraversalForNextStep(traverser);
            }
        }
    }
}

比如 FilterStep 的实现如下:

public abstract class FilterStep<S> extends AbstractStep<S, S> {

    public FilterStep(final Traversal.Admin traversal) {
        super(traversal);
    }

    @Override
    protected Traverser.Admin<S> processNextStart() {
        while (true) {
            final Traverser.Admin<S> traverser = this.starts.next();
            // 返回符合条件的结果
            if (this.filter(traverser))
                return traverser;
        }
    }

    protected abstract boolean filter(final Traverser.Admin<S> traverser);
}

比如 FlatMapStep 的实现如下:

public abstract class FlatMapStep<S, E> extends AbstractStep<S, E> {

    private Traverser.Admin<S> head = null;
    private Iterator<E> iterator = EmptyIterator.instance();

    public FlatMapStep(final Traversal.Admin traversal) {
        super(traversal);
    }

    @Override
    protected Traverser.Admin<E> processNextStart() {
        while (true) {
            if (this.iterator.hasNext()) {
                // 从上一批的结果中取一个返回
                return this.head.split(this.iterator.next(), this);
            } else {
                closeIterator();
                // 从本层中取一个结果
                this.head = this.starts.next();
                // 从一个中间结果扩散出下一层的一批结果
                this.iterator = this.flatMap(this.head);
            }
        }
    }

    protected abstract Iterator<E> flatMap(final Traverser.Admin<S> traverser);
}

VertexStep 继承自 FlatMapStep,默认的实现如下:

public class VertexStep<E extends Element> extends FlatMapStep<Vertex, E> implements AutoCloseable, Configuring {
    
    @Override
    protected Iterator<E> flatMap(final Traverser.Admin<Vertex> traverser) {
        return Vertex.class.isAssignableFrom(this.returnClass) ?
               (Iterator<E>) traverser.get().vertices(this.direction, this.edgeLabels) :
               (Iterator<E>) traverser.get().edges(this.direction, this.edgeLabels);
    }
}

HugeVertexStep 是HugeGraph一个邻接边查询的优化版本实现,HugeVertexStep 继承自 VertexStep,实现如下:

public class HugeVertexStep<E extends Element> extends VertexStep<E> {
    
    @Override
    protected Iterator<E> flatMap(final Traverser.Admin<Vertex> traverser) {
        // 重载flatMap方法,调用厂商自己的实现方法
        boolean queryVertex = this.returnsVertex();
        boolean queryEdge = this.returnsEdge();
        assert queryVertex || queryEdge;
        if (queryVertex) {
            this.iterator = (Iterator<E>) this.vertices(traverser);
        } else {
            assert queryEdge;
            this.iterator = (Iterator<E>) this.edges(traverser);
        }
        return this.iterator;
    }

    private Iterator<Vertex> vertices(Traverser.Admin<Vertex> traverser) {
        Iterator<Edge> edges = this.edges(traverser);
        Iterator<Vertex> vertices = this.queryAdjacentVertices(edges);
        return vertices;
    }

    private Iterator<Edge> edges(Traverser.Admin<Vertex> traverser) {
        Query query = this.constructEdgesQuery(traverser);
        return this.queryEdges(query);
    }

    protected Iterator<Vertex> queryAdjacentVertices(Iterator<Edge> edges) {
        HugeGraph graph = TraversalUtil.getGraph(this);
        Iterator<Vertex> vertices = graph.adjacentVertices(edges);

        if (!this.withVertexCondition()) {
            return vertices;
        }

        // TODO: query by vertex index to optimize
        return TraversalUtil.filterResult(this.hasContainers, vertices);
    }

    protected Iterator<Edge> queryEdges(Query query) {
        HugeGraph graph = TraversalUtil.getGraph(this);

        // Do query
        Iterator<Edge> edges = graph.edges(query);

        if (!this.withEdgeCondition()) {
            return edges;
        }

        // Do filter by edge conditions
        return TraversalUtil.filterResult(this.hasContainers, edges);
    }
}

总结下HugeVertexStep算子的继承关系:HugeVertexStep > VertexStep > FlatMapStep > AbstractStep > Step

更多详细代码参考:HugeVertexStep.java

 类似资料: