分享

Elasticsearch源码:写入流程原理分析(二)

levycui 2020-1-15 18:18:01 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 4795
本帖最后由 levycui 于 2020-1-15 18:31 编辑

问题导读:
1、写主分片节点流程有哪些?
2、performOnReplica方法作用是什么?
3、如何理解checkpoint的概念和作用?
4、写入过程中是如何更新本地检查点和全局检查点的?




上一篇:Elasticsearch源码:写入流程原理分析(一)

3.2.5 写主分片节点流程

代码入口:TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived,然后进入AsyncPrimaryAction#doRun方法。
检查请求: 1.当前是否为主分片;2.allocationId是否是预期值;3.PrimaryTerm是否是预期值
[mw_shl_code=java,true]            if (shardRouting.primary() == false) {
                .....
            }
            final String actualAllocationId = shardRouting.allocationId().getId();
            if (actualAllocationId.equals(targetAllocationID) == false) {
                ......
            }
            final long actualTerm = indexShard.getPendingPrimaryTerm();
            if (actualTerm != primaryTerm) {
                      ......
            }
[/mw_shl_code]

查看主分片是否迁移:

如果已经迁移:1.将phase状态设为“primary_delegation”;2.关闭当前分片的primaryShardReference,及时释放资源;3.获取已经迁移到的目标节点,将请求转发到该节点,并等待执行结果;4.拿到结果后,将task状态更新为“finish”。

[mw_shl_code=java,true]                    transportService.sendRequest(relocatingNode, transportPrimaryAction,
                        new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
                        transportOptions,
                        new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
                            reader) {
                            @Override
                            public void handleResponse(Response response) {
                                setPhase(replicationTask, "finished");
                                super.handleResponse(response);
                            }
                            @Override
                            public void handleException(TransportException exp) {
                                setPhase(replicationTask, "finished");
                                super.handleException(exp);
                            }
                        });
[/mw_shl_code]

如果没有迁移:

1.将task状态更新为“primary”;2.主分片准备操作(主要部分);3.转发请求给副本分片

[mw_shl_code=java,true]setPhase(replicationTask, "primary");
                    final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                    createReplicatedOperation(request,
                        ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                        primaryShardReference).execute(); //入口
[/mw_shl_code]

primary所在的node收到协调节点发过来的写入请求后,开始正式执行写入的逻辑,写入执行的入口是在ReplicationOperation类的execute方法,该方法中执行的两个关键步骤是,首先写主shard,如果主shard写入成功,再将写入请求发送到从shard所在的节点。

[mw_shl_code=java,true]    public void execute() throws Exception {
        .......
        //关键,这里开始执行写主分片
        primaryResult = primary.perform(request);
                .......
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
                        ........
            markUnavailableShardsAsStale(replicaRequest, replicationGroup);
            // 关键步骤,写完primary后这里转发请求到replicas
            performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
        }
        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }
[/mw_shl_code]

下面,我们来看写primary的关键代码,写primary入口函数为TransportShardBulkAction#shardOperationOnPrimary,最后走入到index(…) --> InternalEngine#index()的过程,这是写数据的主要过程。先是通过index获取对应的策略,即plan,通过plan执行对应操作,如要正常写入,则到了indexIntoLucene(…),然后写translog。如下所示:

[mw_shl_code=java,true]    public IndexResult index(Index index) throws IOException {
                            .......
                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        // 将数据写入lucene,最终会调用lucene的文档写入接口
                    indexResult = indexIntoLucene(index, plan);
                } else {
                    indexResult = new IndexResult(
                        plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
                }
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        location = translog.add(new Translog.Index(index, indexResult)); //写translog
                    ......
                    indexResult.setTranslogLocation(location);
                }
              .......
        }
[/mw_shl_code]

ES的写入操作是先写lucene,将数据写入到lucene内存后再写translog。ES之所以先写lucene后写log主要原因大概是写入Lucene时,Lucene会再对数据进行一些检查,有可能出现写入Lucene失败的情况。如果先写translog,那么就要处理写入translog成功但是写入Lucene一直失败的问题,所以ES采用了先写Lucene的方式。

在写完primary后,会继续写replicas,接下来需要将请求转发到从节点上,如果replica shard未分配,则直接忽略;如果replica shard正在搬迁数据到其他节点,则将请求转发到搬迁的目标shard上,否则,转发到replica shard。replicaRequest是在写入主分片后,从primaryResult中获取,并非原始Request。这块代码如下:

[mw_shl_code=java,true]    private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
                                   final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
        totalShards.addAndGet(replicationGroup.getSkippedShards().size());
        final ShardRouting primaryRouting = primary.routingEntry();
        for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
            if (shard.isSameAllocation(primaryRouting) == false) {
                performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
            }
        }
    }
[/mw_shl_code]

performOnReplica方法会将请求转发到目标节点,如果出现异常,如对端节点挂掉、shard写入失败等,对于这些异常,primary认为该replica shard发生故障不可用,将会向master汇报并移除该replica。这块的代码如下:
[mw_shl_code=java,true]    private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
                                  final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
        .....
        totalShards.incrementAndGet();
        pendingActions.incrementAndGet();
        replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
            @Override
            public void onResponse(ReplicaResponse response) {
                successfulShards.incrementAndGet();
                try {
                    primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
                    primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                }
                ......
                decPendingAndFinishIfNeeded();
            }
            @Override
            public void onFailure(Exception replicaException) {
                if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                }
                String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                replicasProxy.failShardIfNeeded(shard, message, replicaException,
                    ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
            }
        });
    }
[/mw_shl_code]

replica的写入逻辑和primary类似,这里不再具体介绍。

前面讲到了一个checkpoint(检查点的概念),在每次写入数据过程都需要更新LocalCheckpoint(本地检查点)和GlobalCheckpoint(全局检查点)。

3.3 更新checkpoint

    了解checkpoint之前,先来看下Primary Terms和Sequence Numbers:

Primary Terms: 由主节点分配给每个主分片,每次主分片发生变化时递增。主要作用是能够区别新旧两种主分片,只对最新的Terms进行操作。

Sequence Numbers: 标记发生在某个分片上的写操作。由主分片分配,只对写操作分配。假设索引test有两个主分片一个副本分片,当0号分片的序列号增加到5时,它的主分片离线,副本提升为新的主,对于后续的写操作,序列号从6开启递增。1号分片有自己独立的Sequence Numbers。

主分片在每次向副本转发写请求时,都会带上这两个值。

    有了Primary Terms和Sequence Numbers,理论上好像就可以检测出分片之间的差异(从旧的主分片删除新的主分片操作历史中不存在的操作,并且将缺少的操作索引到旧主分片),但是当同时为每秒成百上千的事件做索引时,比较数百万个操作的历史是不切实际的,且耗费大量的存储成本,所以ES维护了一个GlobalCheckpoint的安全标记。

    先来看下checkpoint的概念和作用:

GlobalCheckpoint: 全局检查点是所有活跃分片历史都已经对齐的序列号,即所有低于全局检查点的操作都保证已被所有活跃的分片处理完毕。这意味着,当主分片失效时,我们只需要比较新主分片和其他副本分片之间的最后一个全局检查点之后的操作即可。当就主分片恢复时,使用它知道的全局检查点,与新的主分片进行比较。这样,我们只需要进行小部分操作比较,而不是全部。

主分片负责推进全局检查点,它通过跟踪副本上完成的操作来实现。一旦检测到有副本分片已经超出给定序列号,它将相应的更新全局检查点。副本分片不会跟踪所有操作,而是维护一个本地检查点。

LocalCheckpoint: 本地检查点也是一个序列号,所有序列号低于它的操作都已在该分片上(写lucene和translog成功)处理完毕。

全局检查点和本地检查点在内存中维护,但也会保存在每个lucene提交的元数据中。

    我们通过源码来看,写入过程中是如何更新本地检查点和全局检查点的:

主分片写入成功之后,会进行LocalCheckpoint的更新操作,代码入口:ReplicationOperation#execute()->PrimaryShardReference#updateLocalCheckpointForShard(…)->IndexShard#updateLocalCheckpointForShard(…)->ReplicationTracker#updateLocalCheckpoint(…),代码如下:

[mw_shl_code=java,true]        primaryResult = primary.perform(request);  //写主,写lucene和translog
        primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); //更新LocalCheckpoint

    public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) {
        .....
        //获取主分片本地的checkpoints,包括LocalCheckpoint和GlobalCheckpoint
        CheckpointState cps = checkpoints.get(allocationId);
        .....
        // 检查是否需要更新LocalCheckpoint,即需要更新的值是否大于当前已有值
        boolean increasedLocalCheckpoint = updateLocalCheckpoint(allocationId, cps, localCheckpoint);
        // pendingInSync是一个保存等待更新LocalCheckpoint的Set,存放allocation IDs
        boolean pending = pendingInSync.contains(allocationId);
        // 如果是待更新的,且当前的localCheckpoint大于等于GlobalCheckpoint(每次都是先更新Local再Global,正常情况下,Local应该大于等于Global)
        if (pending && cps.localCheckpoint >= getGlobalCheckpoint()) {
                //从待更新集合中移除
            pendingInSync.remove(allocationId);
            pending = false;
            //此分片是否同步,用于更新GlobalCheckpoint时使用
            cps.inSync = true;
            replicationGroup = calculateReplicationGroup();
            logger.trace("marked [{}] as in-sync", allocationId);
            notifyAllWaiters();
        }
        //更新GlobalCheckpoint
        if (increasedLocalCheckpoint && pending == false) {
            updateGlobalCheckpointOnPrimary();
        }
        assert invariant();
    }
[/mw_shl_code]

继续看是如何更新GlobalCheckpoint的:

[mw_shl_code=java,true]    private synchronized void updateGlobalCheckpointOnPrimary() {
        assert primaryMode;
        final CheckpointState cps = checkpoints.get(shardAllocationId);
        final long globalCheckpoint = cps.globalCheckpoint;
        // 计算GlobalCheckpoint,即检验无误后,取Math.min(cps.localCheckpoint, Long.MAX_VALUE)
        final long computedGlobalCheckpoint = computeGlobalCheckpoint(pendingInSync, checkpoints.values(), getGlobalCheckpoint());
        // 需要更新到的GlobalCheckpoint值比当前的global值大,则需要更新
        if (globalCheckpoint != computedGlobalCheckpoint) {
            cps.globalCheckpoint = computedGlobalCheckpoint;
            logger.trace("updated global checkpoint to [{}]", computedGlobalCheckpoint);
            onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint);
        }
    }
[/mw_shl_code]

主分片的检查点更新完成之后,会向副本分片发送对应的写请求,发送请求时同时传入了globalCheckpoint和SequenceNumbers。

[mw_shl_code=java,true]            final long globalCheckpoint = primary.globalCheckpoint();
            final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup);
            performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
[/mw_shl_code]

写副本分片时,进入performOnReplica方法,当监听到分片写入成功之后,则开始更新本地检查点,然后更新全局检查点,更新分方法和之前一样,通过比较当前的检查点是否大于历史检查点,如果是则更新。

[mw_shl_code=java,true]        replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
            @Override
            public void onResponse(ReplicaResponse response) {
                successfulShards.incrementAndGet();
                try {
                        //更新LocalCheckpoint
                    primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
                    //更新globalCheckpoint
                    primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                } catch (final AlreadyClosedException e) {
                   ....
                } catch (final Exception e) {
                              ....
                }
                decPendingAndFinishIfNeeded();
            }
            @Override
            public void onFailure(Exception replicaException) {
               .....
            }
        });
[/mw_shl_code]

总结思考

总体的写入流程源码已经分析完成:

  •     数据可靠性:ES通过副本和Translog保障数据的安全;
  •     服务可用性:在可用性和一致性取舍上,ES更倾向于可用性,只要主分片可用即可执行写入操作;
  •     数据一致性:只要主写入成功,数据就可以被读取,所以查询时操作在主分片和副本分片上可能会得到不同的结果;
  •     原子性:索引的读写、别名操作都是原子操作,不会出现中间状态。但是bulk不是原子操作,不能用来实现事物;

作者:少加点香菜
来源:https://blog.csdn.net/wudingmei1023/article/details/103938670
最新经典文章,欢迎关注公众号


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条