برنامه نویسی

Javaraft: فروشگاه ارزش کلیدی توزیع شده مبتنی بر قایق

در حین تحصیل در مصاحبه های طراحی سیستم ، دانش اساسی را انتخاب کردم و به سیستم های توزیع شده علاقه مند شدم. بعد از برخی مطالعات ، من هنوز بسیاری از الگوریتم های سیستم توزیع شده را کمی انتزاعی و دست و موج پیدا کردم. برای به دست آوردن تجربه دستی ، تصمیم گرفتم یک فروشگاه ارزش کلیدی توزیع شده را بر اساس الگوریتم اجماع قایق از ابتدا بسازم.

چند هفته طول کشید تا مقاله قایق را به طور کامل درک کنم و الگوریتم اصلی را پیاده سازی کنم. در این پست ، من می خواهم در یادگیری برخی از جزئیات کلیدی فروشگاه ارزش توزیع شده با ارزش مبتنی بر قایق ، مجدداً تأکید کنم.

مقدمه

فیلم نسخه ی نمایشی.

این پست وبلاگ از طریق کد قدم می زند.

این یک فروشگاه با ارزش کلیدی توزیع شده است که با استفاده از الگوریتم اجماع RAFT و موتور ذخیره سازی LSM سفارشی اجرا شده است ، یک نقاط پایانی با ارزش کلید ساده را ارائه می دهد و به گونه ای طراحی شده است که تحمل گسل باشد (تا زمانی که اکثر گره ها زنده هستند) و قابل خطوند است. این کار در جاوا با استفاده از Boot Spring با وابستگی های Lombok و SLF4J اجرا می شود.

ویژگی

  • انتخابات رهبر (5.2 پوند)
  • تکثیر ورود به سیستم (5.3 پوند)
  • محدودیت انتخابات (45.4.1)
  • ارتکاب مدخل از شرایط قبلی (45.4 §)
  • تصادفات پیرو و نامزد (5/5 پوند)
  • اجرای معانی قابل تغییر (8)

استفاده

API ارزش کلید توزیع شده را امتحان کنید.

خصوصیات برنامه را برای گره ها پیکربندی کرده و شیشه را بسازید (تست های پرش):

mvn clean package -DskipTests

ترمینال 1

java -jar target/distributed_key_value_store-0.0.1-SNAPSHOT.jar --spring.profiles.active=node1

ترمینال 2

java -jar target/distributed_key_value_store-0.0.1-SNAPSHOT.jar --spring.profiles.active=node2

ترمینال 3

java -jar target/distributed_key_value_store-0.0.1-SNAPSHOT.jar --spring.profiles.active=node3

نقاط پایانی

کلیه عملیات باید به گره رهبر ارسال شود. تغییر مسیر اجرا نمی شود ، و خواندن/نوشتن پیروان مسدود می شود.

دریافت عمل:

در حالی که اجرای من به دور از تولید آماده است ، هدف من ساختن سیستمی بود که در شرایط اساسی کار کند و ایده های اصلی را نشان دهد.

برای تمرکز همه چیز ، من به کد خط به خط یا تعامل ریز دانه بین اجزای دیگر نمی روم. درعوض ، من از طریق یک مرور کلی در مورد نحوه اجرای الگوریتم Raft Core ، قدم می زنم.


انتخابات رهبر (5/5 پوند):

اولین ماژول که من توسعه دادم انتخابات رهبر است. در قایق ، همه گره ها به عنوان پیروان شروع می شوند و هنگامی که تایمر انتخابات پیروان منقضی می شود ، یک رهبر انتخاب می شود.

برای فعال کردن این امر ، من یک تایمر انتخاباتی را اجرا کردم که پس از انقضا ، انتخابات را آغاز می کند. هنگامی که گره حالت خود را به پیروان منتقل می کند ، نامیده می شود.

// If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate:
// convert to candidate (§5.2)
public void reset() {
    cancel();

    long minTimeout = config.getElectionTimeoutMillisMin();
    long maxTimeout = config.getElectionTimeoutMillisMax();
    long timeout = minTimeout + random.nextInt((int)(maxTimeout - minTimeout));

    electionFuture = scheduler.schedule(() -> {
        electionManager.startElection();
    }, timeout, TimeUnit.MILLISECONDS);
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

کلاس ElectionManager شامل دو روش اصلی است: StartElection () و HandleVoterEquest (). روش startelection () بر اساس تعداد همسالان ، موضوعات را از استخر اختصاص می دهد و درخواست های آراء ناهمزمان را از طریق نقطه پایانی /درخواست Vote به هر پیروان ارسال می کند. سپس ، موضوع اصلی با تنظیم مجدد تایمر انتخابات ، روش را تکمیل می کند. در اینجا ، یک زمانبندی ، پیکربندی شده در Resttemplate ، تضمین می کند که پاسخ ها در مدت زمان کوتاهتر از تایمر انتخابات وارد می شوند و از انتخابات همپوشانی در همان گره جلوگیری می کنند.

اگر پاسخ همسالان نشان دهنده اصطلاح بالاتری باشد ، گره بلافاصله به حالت پیروان پایین می آید و انتخابات نتیجه می گیرد. اگر کمتر از اکثر پاسخ ها دریافت شود ، تایمر انتخابات تنظیم مجدد می شود. در غیر این صورت ، گره به یک رهبر تبدیل می شود ، تایمر انتخابات را برای جلوگیری از انتخابات جدید لغو می کند و شروع به ارسال ضربان قلب به گره های پیرو می کند. اجرای روش HandleVoterEquest به دنبال کاغذ قایق ساده است ، بنابراین پرش می شود.

/**
 * On conversion to candidate, start election:
 * Increment currentTerm, vote for self, reset election timer, send RequestVote RPCs to all other servers.
 */
public void startElection() {
    lockManager.getStateWriteLock().lock();
    lockManager.getLogReadLock().lock();
    try {
        if (nodeState.getCurrentRole() == Role.LEADER) {
            return;
        }

        nodeState.setCurrentRole(Role.CANDIDATE);
        nodeState.incrementTerm();
        nodeState.setVotedFor(nodeState.getNodeId());

        int currentTerm = nodeState.getCurrentTerm();
        int lastLogIndex = raftLog.getLastIndex();
        int lastLogTerm = raftLog.getLastTerm();

        List<CompletableFuture<VoteResponseDto>> voteFutures = new ArrayList<>();
        ExecutorService executor = Executors.newCachedThreadPool();

        for (String peerUrl : config.getPeerUrls().values()) {
            CompletableFuture<VoteResponseDto> voteFuture = CompletableFuture
                    .supplyAsync(() -> requestVote(
                            currentTerm,
                            nodeState.getNodeId(),
                            lastLogIndex,
                            lastLogTerm,
                            peerUrl
                    ), executor)
                    .orTimeout(config.getElectionRpcTimeoutMillis(), TimeUnit.MILLISECONDS)
                    .exceptionally(throwable -> {
                        return new VoteResponseDto(currentTerm, false);
                    });
            voteFutures.add(voteFuture);
        }

        int majority = (config.getPeerUrls().size() + 1) / 2 + 1;
        AtomicInteger voteCount = new AtomicInteger(1);

        // If votes received from majority of servers: become leader (§5.2).
        for (CompletableFuture<VoteResponseDto> future : voteFutures) {
            future.thenAccept(response -> {
                lockManager.getStateWriteLock().lock();
                try {
                    if (nodeState.getCurrentRole() != Role.CANDIDATE || nodeState.getCurrentTerm() != currentTerm) {
                        return;
                    }
                    if (response != null && response.isVoteGranted()) {
                        int newVoteCount = voteCount.incrementAndGet();
                        if (newVoteCount >= majority) {
                            stateManager.becomeLeader();
                        }
                    }
                } finally {
                    lockManager.getStateWriteLock().unlock();
                }
            });
        }
        stateManager.resetElectionTimer();
    } finally {
        lockManager.getLogReadLock().unlock();
        lockManager.getStateWriteLock().unlock();
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


تکثیر ورود به سیستم (5/5)):

دومین ماژول که من اجرا کردم ، تکثیر log است. با توجه به پیچیدگی این منطق ، من مسئولیت ها را به سه کلاس جداگانه تقسیم کردم: یکی برای رسیدگی به درخواست های نوشتن مشتری ، دیگری برای شروع تکثیر ورود به سیستم و در آخر برای پردازش ورودی های ضمیمه.

emendentrieshandler.java
ClientRequestHandler.java
logReplicator.java
raftreplicationManager.java

برای مدیریت این فعل و انفعالات ، من از الگوی طراحی نمای استفاده کردم و یک کلاس متمرکز ReplicationManager را برای تفویض تماس ها به کلاس های مناسب معرفی کردم.

...
public class RaftReplicationManager {
    ...

    public boolean handleClientRequest(LogEntry entry) {
        return clientRequestHandler.handle(entry);
    }

    public void startLogReplication() {
        logReplicator.start();
    }

    public AppendEntriesResponseDTO handleAppendEntries(AppendEntriesDTO dto) {
        return appendEntriesHandler.handle(dto);
    }

    public void initializeIndices() {
        logReplicator.initializeIndices();
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

فرآیند تکثیر با روش start () در کلاس LogReplicator آغاز می شود. این روش به چندین ساختار داده کلیدی تعریف شده توسط RAFT متکی است: NextIndex[] و MatchIndex[]، که پیشرفت تکثیر را برای هر پیروان دنبال می کند ، و یک نسخه در انتظار[] نقشه برای مدیریت وظایف تکرار مداوم.

هنگامی که یک گره پس از پیروزی در انتخابات رهبر شد ، این روش بیش از همه پیروان در مورد تکیه را تکرار می کند[] با استفاده از موضوعات موجود از یک استخر نخ ثابت ، تکثیر ورود به سیستم ناهمزمان را برای هر یک از آنها در صورت عدم راه اندازی ، نقشه و راه اندازی می کند.

// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods
// to prevent election timeouts (§5.2)
public void start() {
    lockManager.getStateReadLock().lock();
    try {
        if (nodeState.getCurrentRole() != Role.LEADER) return;
    } finally {
        lockManager.getStateReadLock().unlock();
    }

    for (String peer : config.getPeerUrls().values()) {
        if (!pendingReplication.getOrDefault(peer, false)) {
            pendingReplication.put(peer, true);
            executor.submit(() -> replicateLoop(peer));
        }
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

برای هر پیرو ، یک موضوع ناهمزمان روش Replicateloop () را اجرا می کند. این روش یک حلقه مدتی را اجرا می کند که به طور مداوم ورودی های جدید ورود به سیستم را به نقطه پایانی /Appenidentries از پیروان می دهد ، دنبال کنندگان را در ورودی های گمشده جلب می کند ، یا ورودی های ورود به سیستم خالی را به عنوان ضربان قلب برای حفظ رهبری ارسال می کند. دوره برگشت حلقه با فاصله ضربان قلب هماهنگ است.

// Sends heartbeats/log entries to peer, adjusting sleep time to match heartbeat interval
private void replicateLoop(String peer) {
    int backoff = config.getHeartbeatIntervalMillis();
    while (true) {
        lockManager.getStateReadLock().lock();
        try {
            if (nodeState.getCurrentRole() != Role.LEADER) break;
        } finally {
            lockManager.getStateReadLock().unlock();
        }

        lockManager.getLogWriteLock().lock();
        lockManager.getStateWriteLock().lock();
        lockManager.getStateMachineWriteLock().lock();
        long startTime = System.currentTimeMillis();
        try {
            boolean ok = replicate(peer);
            if (ok) updateCommitIndex();
        } finally {
            lockManager.getStateMachineWriteLock().unlock();
            lockManager.getStateWriteLock().unlock();
            lockManager.getLogWriteLock().unlock();
        }
        long duration = System.currentTimeMillis() - startTime;
        long sleepTime = Math.max(0, backoff - duration);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    pendingReplication.put(peer, false);
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

در اینجا ، زمان بندی مجدد برای جلوگیری از ایستادن حلقه به طور نامحدود پیکربندی شده است ، که در غیر این صورت می تواند یک انتخابات غیرقانونی را در پیروان ایجاد کند.

هنگامی که حلقه تکثیر موفق می شود – با اطمینان هیچ پیروان با یک اصطلاح بالاتر پاسخ نمی دهد – رهبر NextIndex مربوطه را به روز می کند[] و MatchIndex[] مقادیر و استفاده از روش UpdateCommitIndex ().

// If last log index >= nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex.
private boolean replicate(String peer) {
    int ni = nextIndex.get(peer);
    int prevIdx = ni - 1;
    int prevTerm = (prevIdx >= 0) ? raftLog.getTermAt(prevIdx) : 0;
    List<LogEntry> entries = raftLog.getEntriesFrom(ni);

    AppendEntriesRequestDto dto = new AppendEntriesRequestDto(
            nodeState.getCurrentTerm(), nodeState.getNodeId(),
            prevIdx, prevTerm, entries, raftLog.getCommitIndex()
    );

    String peerId = config.getPeerUrls().entrySet().stream()
            .filter(entry -> entry.getValue().equals(peer))
            .map(Map.Entry::getKey)
            .map(String::valueOf)
            .findFirst()
            .orElse("unknown");

    try {
        String url = peer + "/raft/appendEntries";
        ResponseEntity<AppendEntriesResponseDto> res = restTemplate.postForEntity(url, dto, AppendEntriesResponseDto.class);
        AppendEntriesResponseDto body = res.getBody() != null ? res.getBody() : new AppendEntriesResponseDto(-1, false);
        if (body.getTerm() > nodeState.getCurrentTerm()) {
            nodeStateManager.becomeFollower(body.getTerm());
            return false;
        }
        if (body.isSuccess()) {
            nextIndex.put(peer, ni + entries.size());
            matchIndex.put(peer, ni + entries.size() - 1);
            return true;
        } else {
            int newNextIndex = Math.max(0, ni - 1);
            nextIndex.put(peer, newNextIndex);
            return false;
        }
    } catch (ResourceAccessException e) {
        return false;
    } catch (Exception e) {
        return false;
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

این به نوبه خود ، روش Applyentries () را تحریک می کند ، که ورودی های ورود به سیستم را برای فهرست متعهد به فروشگاه داده اعمال می کند. علاوه بر این ، هنگام پیشبرد شاخص تعهد ، تضمین می کند که فقط ورودی های دوره فعلی رهبر انجام می شود. این ضمانت ایمنی RAFT را تأیید می کند و از استفاده از ورودی های قدیمی از شرایط قبلی جلوگیری می کند. این در زیر نشان داده شده است:

private void updateCommitIndex() {
    int majority = (config.getPeerUrlList().size() + 1) / 2 + 1;
    int term = nodeState.getCurrentTerm();
    for (int i = log.getLastIndex(); i > log.getCommitIndex(); i--) {
        int count = 1;
        for (int idx : matchIndex.values()) {
            if (idx >= i) count++;
        }
        if (count >= majority && log.getTermAt(i) == term) {
            log.setCommitIndex(i);
            applyEntries();
            break;
        }
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

از دیدگاه پیروان ، روش دسته () در کلاس AdventendriesHandler پردازش های وابسته به رهبر را پردازش می کند.

این ابتدا با بررسی این اصطلاح ، رهبری فرستنده را تأیید می کند. در مرحله بعد ، ورود خود را با NextIndex رهبر مقایسه می کند[] برای آن پیرو. اگر ورود به سیستم پیروان به دلیل عدم تطابق در ورودی های قبلی به روز نباشد ، این درخواست را رد می کند و باعث می شود رهبر از NextIndex پیروان کاهش یابد[] توسط یک و دوباره امتحان کنید.

اگر ورود به سیستم مطابقت داشته باشد ، پیروان ورودی ها را با توجه به ویژگی تطبیق ورود به سیستم (5.2 §) می پذیرد ، شاخص تعهد خود را به روز می کند و ورودی های دستگاه دولتی خود را تا شاخص جدید تعهد اعمال می کند.

public AppendEntriesResponseDto handle(AppendEntriesRequestDto dto) {
    lockManager.getLogWriteLock().lock();
    lockManager.getStateWriteLock().lock();
    lockManager.getStateMachineWriteLock().lock();
    try {
        int term = nodeState.getCurrentTerm();
        int leaderTerm = dto.getTerm();

        // Reply false if term < currentTerm (§5.1)
        if (leaderTerm < term) return new AppendEntriesResponseDto(term, false);

        // If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
        if (leaderTerm > term) {
            stateManager.becomeFollower(leaderTerm);
            term = leaderTerm;
        }

        nodeState.setCurrentLeader(dto.getLeaderId());

        // Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
        if (dto.getPrevLogIndex() > 0 &&
                (!raftLog.containsEntryAt(dto.getPrevLogIndex()) ||
                        raftLog.getTermAt(dto.getPrevLogIndex()) != dto.getPrevLogTerm())) {
            stateManager.resetElectionTimer();
            return new AppendEntriesResponseDto(term, false);
        }

        // If an existing entry conflicts with a new one (same index but different terms), delete the existing
        // entry and all that follow it (§5.3). Append any new entries not already in the log.
        int index = dto.getPrevLogIndex() + 1;
        List<LogEntry> entries = dto.getEntries();
        if (!entries.isEmpty()) {
            for (int i = 0; i < entries.size(); i++) {
                int logIndex = index + i;
                if (raftLog.containsEntryAt(logIndex) && raftLog.getTermAt(logIndex) != entries.get(i).getTerm()) {
                    raftLog.deleteFrom(logIndex);
                    raftLog.appendAll(entries.subList(i, entries.size()));
                    break;
                }
            }
            if (!raftLog.containsEntryAt(index)) {
                raftLog.appendAll(entries);
            }
        }

        // If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of the last new entry)
        if (dto.getLeaderCommit() > raftLog.getCommitIndex()) {
            int lastNew = dto.getPrevLogIndex() + entries.size();
            raftLog.setCommitIndex(Math.min(dto.getLeaderCommit(), lastNew));
            applyEntries();
        }
        stateManager.resetElectionTimer();
        return new AppendEntriesResponseDto(term, true);
    } finally {
        lockManager.getStateMachineWriteLock().unlock();
        lockManager.getStateWriteLock().unlock();
        lockManager.getLogWriteLock().unlock();
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

سرانجام ، ClientRequestHandler مدیریت مشتری را به پایگاه داده می نویسد. روش دسته () آن () ورود به ورود به سیستم رهبر را برای تکثیر و نظارت بر شاخص ورود دارد. این یک پاسخ موفقیت را تنها زمانی برمی گرداند که آخرین شاخص کاربردی از شاخص ورودی پیشی بگیرد و تأیید کند که نوشتن به دستگاه دولت اعمال شده است.

این مکانیسم از قابلیت خطی پشتیبانی می کند ، و اطمینان می دهد که مشتری بعدی به دنبال نوشتن موفقیت آمیز ، بازتاب دستگاه حالت به روز شده است.

// If command received from the client: append entry to local log, respond after entry applied
// to state machine (§5.3)
public boolean handle(LogEntry clientEntry) {
    lockManager.getStateReadLock().lock();
    lockManager.getLogWriteLock().lock();
    int entryIndex;
    try {
        if (!raftNodeState.isLeader()) return false;
        raftLog.append(clientEntry);
        entryIndex = raftLog.getLastIndex();
    } finally {
        lockManager.getLogWriteLock().unlock();
        lockManager.getStateReadLock().unlock();
    }

    long start = System.currentTimeMillis();
    long timeoutMillis = raftConfig.getClientRequestTimeoutMillis();

    while (true) {
        lockManager.getLogReadLock().lock();
        lockManager.getStateReadLock().lock();
        try {
            if (raftNodeState.getCurrentRole() != Role.LEADER) {
                return false;
            }
            if (raftNodeState.getLastApplied() >= entryIndex) {
                return true;
            }
        } finally {
            lockManager.getStateReadLock().unlock();
            lockManager.getLogReadLock().unlock();
        }

        if (System.currentTimeMillis() - start > timeoutMillis) {
            return false;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

در اینجا ، تأخیر جزئی به دلیل نوشتن مشتری با چرخه ضربان قلب مطابقت دارد. اگر فاصله ضربان قلب 1 ثانیه باشد ، رهبر ممکن است قبل از تکرار نوشتن تا 1 ثانیه صبر کند. در حالی که پروتکل RAFT اجازه می دهد تا RPC های مستقل مستقل برای نوشتن (جدا از ضربان قلب) ، من تصمیم گرفتم که همگام سازی مشتری را با چرخه ضربان قلب همگام سازی کنم تا بتوانند کارآزمایی همزمان را ساده تر کنند و از تکثیر همزمان استفاده کنم.

قبل از پایان ، می خواهم به نگرانی های همزمانی بپردازم. در قایق ، موضوعات همزمان دست زدن به درخواست VOTE ، برنامه های اختصاصی و زمان بندی انتخابات می توانند شرایط مسابقه را با اصلاح همزمان حالت گره مشترک (مانند جریان فعلی ، رای گیری و ورود) معرفی کنند. برای اطمینان از سریال سازی و جلوگیری از چنین شرایط مسابقه ، من یک قفل متمرکز خواندن نوشتن را اجرا کردم که نگهبانان به حالت گره ، ورود به سیستم و دستگاه حالت گره دسترسی پیدا می کنند.

این نتیجه گیری در مورد منطق اصلی قایق است. اکنون ، من می خواهم در مورد نگرانی اساسی که در مورد مقاله RAFT معروف به “اجرای معانی قابل تغییر” (8) صحبت کردم ، صحبت کنم.


اطمینان از معناشناسی قابل استفاده

در قایق ، نوشتن ها پس از رسیدن به quorum اکثریت ، مرتکب می شوند ، و رهبر تضمین می شود که تمام ورودی های متعهد را در اختیار داشته باشد. با این حال ، این ذاتاً تضمین نمی کند که خوانده شده به رهبر همیشه منعکس کننده ترین حالت به روز باشد ، که برای خطی پذیری ضروری است.

سناریویی را در نظر بگیرید که مشتری به یک رهبر متصل است که به دلیل یک پارتیشن شبکه رهبری خود را از دست می دهد و آن را از اکثر خوشه ها جدا می کند. اگر مشتری قبل از بهبودی پارتیشن به این رهبر پیشین وصل شود ، گره ممکن است هنوز خود را رهبر در نظر بگیرد و از داده های دستگاه دولتی خود که می توانست توسط یک رهبر جدید کنار گذاشته شود ، داده های بی نظیر را ارائه دهد. این امر خطی پذیری را نقض می کند ، زیرا خواندن نشان دهنده آخرین نوشته های متعهد نیست.

Raft Paper راه حل های مختلفی را برای این امر ارائه می دهد و من مکانیسمی را اجرا کردم که در آن مشتری عملیات را دریافت می کند ، همیشه برای تأیید رهبری ، ورود No-Op را به پیروان ارسال می کند. اگر اکثر گره ها ورود را از طریق منطق نقطه پایانی /Appendentries بپذیرند ، تضمین می کند که رهبر بی نظیر نیست و از این طریق خوانده شده را به عنوان خطی تأیید می کند. در حالی که این سفر به دور شبکه اضافی را معرفی می کند ، با توجه به اجرای فعلی من ، راه حل فقط کافی را ارائه می دهد.

گزینه های کارآمدتر ، مانند ReadIndex یا خواندن مبتنی بر اجاره نامه ، وجود دارد ، اما من این رویکرد ساده را انتخاب کردم. با عرض پوزش برای کد کثیف زیر: این به این دلیل بود که من قصد نداشتم این کلاس را تمدید کنم.

@GetMapping("/get")
public ResponseEntity<String> get(@RequestParam String key) {
    final String NO_OP_ENTRY = "NO_OP_ENTRY";
    try {
        handleRead(new WriteRequestDto(NO_OP_ENTRY, Long.MAX_VALUE, NO_OP_ENTRY, NO_OP_ENTRY));
    } catch (Exception e) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Failed to process read request: " + e.getMessage());
    }

    lockManager.getStateMachineReadLock().lock();
    try {
        String value = kvStore.get(key);
        if (value == null) {
            return ResponseEntity.status(HttpStatus.NOT_FOUND)
                    .body("Key not found: " + key);
        }
        return ResponseEntity.ok(value);
    } catch (Exception e) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Error retrieving key: " + e.getMessage());
    } finally {
        lockManager.getStateMachineReadLock().unlock();
    }
}

private void handleRead(WriteRequestDto request) {
    LogEntry entry = new LogEntry(
            nodeState.getCurrentTerm(),
            request.getKey(),
            request.getValue(),
            LogEntry.Operation.PUT,
            request.getClientId(),
            request.getSequenceNumber()
    );
    boolean committed = replicationManager.handleClientRequest(entry);
    if (!committed) throw new RuntimeException("Can't process the read at this moment");
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


اطمینان از معناشناسی قابل استفاده 2

Raft Paper در مورد سناریویی بحث می کند که “اگر رهبر پس از ارتکاب ورود به سیستم خراب شود اما قبل از پاسخ به مشتری ، مشتری فرمان را با یک رهبر جدید دوباره امتحان کند و باعث شود بار دوم اجرا شود.” Deduplication طرف مشتری با پیوستن به یک شناسه منحصر به فرد (به عنوان مثال ، یک شماره دنباله) به هر درخواست ، این مسئله را حل می کند. سرور این شناسه ها را ردیابی می کند و نسخه های تکراری را رد می کند. این هم صحت و هم قابلیت خطی را حفظ می کند و به یکی از چالش های اصلی پرداخته است.

در اجرای ، من روش Apply () را در کلاس StateMachine اصلاح کردم. قبل از استفاده از نوشتن ، از پایگاه داده (در حافظه) پرس و جو می کند تا شماره دنباله ورود به سیستم را با آخرین شماره دنباله ضبط شده برای آن شناسه مشتری مقایسه کند. نوشتن فقط در صورتی که تعداد دنباله جدید بیشتر باشد اعمال می شود ، تأیید آن به عنوان یک عمل تازه و نه یک کپی.

@Override
public void apply(LogEntry entry) {
    if (entry == null) {
        throw new IllegalArgumentException("Log entry cannot be null");
    }

    // deduplication check
    String clientId = entry.getClientId();
    long sequenceNumber = entry.getSequenceNumber();
    Long lastSequenceNumber = kvStore.getLastSequenceNumber(clientId);
    if (lastSequenceNumber != null && sequenceNumber <= lastSequenceNumber) {
        return;
    }
...
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


تداوم

شرط نهایی برای ایجاد یک بانک اطلاعاتی کاملاً کاربردی ، پایداری است. دستگاه دولتی باید دوام را تضمین کند و اطمینان حاصل کند که هیچ داده ای در هنگام تصادف از بین نمی رود. در حالی که من در ابتدا اجزای اصلی RAFT را با استفاده از یک اجرای حافظه آزمایش کردم ، بعداً پایداری مبتنی بر دیسک را در بر گرفتم.

کلاس DB این رویکرد را تجسم می کند. من از پایگاه داده تعبیه شده سفارشی خود به عنوان یک دستگاه دولتی استفاده کردم. برای ورود به سیستم RAFT ، من یک پرونده با حمایت از پرونده و فقط ضمیمه را پیاده سازی کردم تا از دوام دیسک اطمینان حاصل شود.

با وجود این مؤلفه ها ، من به عناصر اساسی فروشگاه ارزش کلیدی مبتنی بر قایق خود پرداخته ام. من معتقدم که اجرای من کامل نیست و باید شرایط بسیار خسته کننده تر و موارد گوشه ای وجود داشته باشد که من به سادگی از آنها آگاه نیستم. روشهای بی شماری برای اشکالات ظریف.

با این وجود ، این یک سفر کاملاً هیجان انگیز بود زیرا به من می آموزد که در طراحی سیستم های توزیع شده در نظر بگیرم.

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا