【Elasticsearch源码解析】通信模块篇——ES中对REST请求的处理浅析

一、前言

从本文开始,我打算开一个新坑,分模块来讲一讲 ES 的源码。本系列的目的主要是方便我自己对于 ES 源码的理解进行梳理和总结。当然,如果有人能读到我的解析并能从中获益就更好了。本系列文章会基于 ES 在 github 上的最新开源代码,版本是 7.12.0。

二、Elasticsearch 的模块化

在开源项目索引网站 openhub 上可以查到,目前 ES 的源码行数有 200w 行。在 Java 代码世界里也算是一个庞然大物级别的项目(Spring 框架总代码量 138w 行)。ES 用 200w 行代码构建了一个带分布式的全文搜索引擎,具有很好的定制化能力,性能良好,并且开箱即用。

Elasticsearch 的模块化做的很不错,不同功能用不同的 service 或者 module 实现。基于完善的模块化,Elasticsearch 从源码中放开了对于模块的自定义能力,支持通过插件包的方式对于各模块进行定制化。可以定制的功能非常丰富,从底层的 search engine 到上层的配置,不需要改动 ES 的代码就能增加自定义功能。比如说,Amazon 开源的 Open Distro 项目就包括很多定制化插件,而 ES 官方的商业 X-Pack 版本也是由一系列的插件包组成的。

本文就从 ES 的通信模块开始,来详细讲解下 ES 源码是如何实现通信功能的。

三、网络模块初始化

ES 的网络请求分为两类:一个是客户端连接集群节点用的 Rest 请求,走 HTTP 协议,另一个是集群节点之间的 Transport 请求,走 TCP 协议。接下来看代码,直接从 ES 通信模块类 NetworkModule 看起:


        /**
         * Creates a network module that custom networking classes can be plugged into.
         * @param settings The settings for the node
         */
        public NetworkModule(Settings settings, List<NetworkPlugin> plugins, ThreadPool threadPool,
                             BigArrays bigArrays,
                             PageCacheRecycler pageCacheRecycler,
                             CircuitBreakerService circuitBreakerService,
                             NamedWriteableRegistry namedWriteableRegistry,
                             NamedXContentRegistry xContentRegistry,
                             NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
                             ClusterSettings clusterSettings) {
            this.settings = settings;
            for (NetworkPlugin plugin : plugins) {
                Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                    pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                    // Rest请求handler注册
                    registerHttpTransport(entry.getKey(), entry.getValue());
                }
                Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                    circuitBreakerService, namedWriteableRegistry, networkService);
                for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
                    // Transport请求handler注册
                    registerTransport(entry.getKey(), entry.getValue());
                }
                List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                    threadPool.getThreadContext());
                for (TransportInterceptor interceptor : transportInterceptors) {
                    registerTransportInterceptor(interceptor);
                }
            }
        }

其中遍历了实现 NetworkPlugin 的插件,并分别注册了 Rest 和 Transport 的 handler,实际使用时,取出来具体的 handler 来初始化。在 ES 代码中,以 Plugin 结尾的都是插件要实现的一些重要接口,需要实现哪种功能就去实现接口中定义的对应方法就好。其中 NetworkPlugin 中就定义了以下两个重要方法:

    /**
     * Returns a map of {@link Transport} suppliers.
     * See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
     */
    default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                           CircuitBreakerService circuitBreakerService,
                                                           NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
        return Collections.emptyMap();
    }

    /**
     * Returns a map of {@link HttpServerTransport} suppliers.
     * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
     */
    default Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                         PageCacheRecycler pageCacheRecycler,
                                                                         CircuitBreakerService circuitBreakerService,
                                                                         NamedXContentRegistry xContentRegistry,
                                                                         NetworkService networkService,
                                                                         HttpServerTransport.Dispatcher dispatcher,
                                                                         ClusterSettings clusterSettings) {
        return Collections.emptyMap();
    }

分别对应了 Rest 和 Transport 的 handler 实现。

在节点初始化时,会通过下面这个方法获取 Rest 接口的 handler(Transport 接口同理),依次读取 http.type 和 http.default.type 这两个配置。而 ES 默认的网络实现是通过 transport-netty4 插件实现的,在这个插件中,会设置 http.default.type 配置。当用户没有自制自己的网络模块时,就会使用默认的 netty 实现。如果用户需要自定义时,只需要在插件中设置自己的网络模块名字,然后修改 ES 的 http.type 配置就好。

       public Supplier<HttpServerTransport> getHttpServerTransportSupplier() {
        final String name;
        if (HTTP_TYPE_SETTING.exists(settings)) {
            name = HTTP_TYPE_SETTING.get(settings);
        } else {
            name = HTTP_DEFAULT_TYPE_SETTING.get(settings);
        }
        final Supplier<HttpServerTransport> factory = transportHttpFactories.get(name);
        if (factory == null) {
            throw new IllegalStateException("Unsupported http.type [" + name + "]");
        }
        return factory;
    }

四、Rest 请求处理流程

接下来我们一步一步分析 ES 时如何处理 Rest 请求的.

首先从入口看起,在 transport-netty4 插件中通过 getHttpTransports 方法注册了 Netty4HttpServerTransport 类:

    @Override
    public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                        PageCacheRecycler pageCacheRecycler,
                                                                        CircuitBreakerService circuitBreakerService,
                                                                        NamedXContentRegistry xContentRegistry,
                                                                        NetworkService networkService,
                                                                        HttpServerTransport.Dispatcher dispatcher,
                                                                        ClusterSettings clusterSettings) {
        return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
            () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
                clusterSettings, getSharedGroupFactory(settings)));
    }

这其中做了 Netty 的初始化工作,然后在 pipeline 中增加了一个 handler,对应类是 Netty4HttpRequestHandler,这个类继承了 Netty 中的抽象类 SimpleChannelInboundHandler,只需要实现 channelRead0 这个抽象方法就能拿到从网络 IO 中反序列化出来的 HttpRequest 对象。

接下来就与 Netty 无关了,是 ES 对于请求的处理过程。在抽象类 AbstractHttpServerTransport 中做了 request 和 channel 的进一步包装,然后将请求分发给 RestController,在这个类中做了实际的 HTTP 请求 header 校验和最重要的部分——URL 匹配。URL 匹配使用了前缀树算法,查找方法如下:

    /**
     * Returns an iterator of the objects stored in the {@code PathTrie}, using
     * all possible {@code TrieMatchingMode} modes. The {@code paramSupplier}
     * is called between each invocation of {@code next()} to supply a new map
     * of parameters.
     */
    public Iterator<T> retrieveAll(String path, Supplier<Map<String, String>> paramSupplier) {
        return new Iterator<>() {

            private int mode;

            @Override
            public boolean hasNext() {
                return mode < TrieMatchingMode.values().length;
            }

            @Override
            public T next() {
                if (hasNext() == false) {
                    throw new NoSuchElementException("called next() without validating hasNext()! no more modes available");
                }
                return retrieve(path, paramSupplier.get(), TrieMatchingMode.values()[mode++]);
            }
        };
    }

然后在 TrieMatchingMode 这个枚举类中定义了匹配的规则,每次遍历完后,mode 会自增,就会使用下一个规则,直到所有规则匹配完毕:

    enum TrieMatchingMode {
        /*
         * Retrieve only explicitly mapped nodes, no wildcards are
         * matched.
         */
        EXPLICIT_NODES_ONLY,
        /*
         * Retrieve only explicitly mapped nodes, with wildcards
         * allowed as root nodes.
         */
        WILDCARD_ROOT_NODES_ALLOWED,
        /*
         * Retrieve only explicitly mapped nodes, with wildcards
         * allowed as leaf nodes.
         */
        WILDCARD_LEAF_NODES_ALLOWED,
        /*
         * Retrieve both explicitly mapped and wildcard nodes.
         */
        WILDCARD_NODES_ALLOWED
    }

其中每个 URL 都与具体的 RestAction 对应,当匹配上时,就会将请求分发给实际的 action 来处理。参考一下最简单的 RestCatAction:

public class RestCatAction extends BaseRestHandler {

    private static final String CAT = "=^.^=";
    private static final String CAT_NL = CAT + "\n";
    private final String HELP;

    public RestCatAction(List<AbstractCatAction> catActions) {
        StringBuilder sb = new StringBuilder();
        sb.append(CAT_NL);
        for (AbstractCatAction catAction : catActions) {
            catAction.documentation(sb);
        }
        HELP = sb.toString();
    }

    @Override
    public List<Route> routes() {
        return List.of(new Route(GET, "/_cat"));
    }

    @Override
    public String getName() {
        return "cat_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, HELP));
    }

}

这个类实现了三个方法:

  1. getName 只用在_nodes/usage 接口中,只要返回一个名字就好
  2. routes 定义了 action 对应的 Rest 请求方法和 URL
  3. prepareRequest 定义了实际处理请求的内容,注意最后返回一个 consumer,实际执行是在 BaseRestHandler 中:
    @Override
    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        // prepare the request for execution; has the side effect of touching the request parameters
        final RestChannelConsumer action = prepareRequest(request, client);

        // validate unconsumed params, but we must exclude params used to format the response
        // use a sorted set so the unconsumed parameters appear in a reliable sorted order
        final SortedSet<String> unconsumedParams =
            request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

        // validate the non-response params
        if (!unconsumedParams.isEmpty()) {
            final Set<String> candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams());
            throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
        }

        if (request.hasContent() && request.isContentConsumed() == false) {
            throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
        }

        usageCount.increment();
        // execute the action
        action.accept(channel);
    }

五、总结

本文介绍了 ES 的通信模块,并梳理了整个 Rest 请求的处理流程:从节点启动开始,Netty 接受到用户发送的 Rest 请求,解析并包装成对象,做 HTTP 相关校验,根据 HTTP 方法和 URL 匹配 RestAction,action 处理请求并返回。 文章的第二部分会梳理 Transport 请求的处理,敬请期待。