برنامه نویسی

انتشار پیام کارآمد و قابل اعتماد با RabbitMQ و Elixir

من اخیراً مجبور شدم در یک معماری زیر میخانه ، در اطراف تولید کنندگان و مصرف کنندگان برای RabbitMQ ایجاد کنم.

موانع زیادی برای اطمینان از تحویل پیام قابل اعتماد (در یک زمان) در هنگام استفاده از یک کارگزار پیام توزیع شده مانند RabbitMQ وجود دارد.

پیام ها ممکن است بین تولید کننده و کارگزار ، داخل کارگزار و بین کارگزار و مصرف کنندگان از بین برود. و ظرافت های زیادی وجود دارد.

به طور خلاصه ، شما باید فعال کنید:

  • ناشر تأیید می کند – به طوری که پیام های کارگزار ACK
  • پایداری پیام – به طوری که کارگزار فقط پس از گرگرفتگی آن به دیسک پیام می دهد
  • صف های بادوام (با یک تبادل بادوام) – به طوری که صف ها زنده مانده و از کارگزار و مجدداً استفاده می کنند
  • اکینگ مصرف کننده – به طوری که صف پیام ها را تا زمانی که توسط یک مصرف کننده استفاده شود ، حفظ می کند

وب سایت رسمی RabbitMQ مستندات خوبی دارد ، از جمله این آموزش Elixir ، اما نمونه هایی از استفاده از Primitives AMQP در یک تنظیم قوی OTP پراکنده است.

در این پست من به طور خاص می خواهم به کارآمد بپردازم ناشر تأیید می کندبشر

همزمان در انتظار تأیید ناشر

در اینجا دو پروژه با نمونه ای از ماژول های ناشر آورده شده است:

متأسفانه ، هر دو از آنها استفاده می کنند AMQP.Confirm.wait_for_confirms/2 ابتدایی در داخل GenServer پاسخ به تماس به این ترتیب ، پیام ها منتشر می شوند (با AMQP.Basic.publish/5) و بلافاصله در انتظار handle_call/3 پاسخ به تماس و در نتیجه تنگنای ایجاد می شود که به طور مؤثر از انتشار همزمان جلوگیری می کند. مطمئناً توان رنج خواهد برد.

دست زدن به Async از تأییدیه های ناشر

راه دیگری وجود دارد. AMQP.Confirm.register_handler/2 به شما امکان می دهد فرایندی را ثبت کنید که تأیید کارگزار را به صورت غیر همزمان دریافت کند. سپس ACK (یا NACKS) فراخوانی می شود handle_info با یکی از این مجموعه استدلال ها:

{:basic_ack, seqno, multiple}, state
{:basic_nack, seqno, multiple}, state
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

این نشان می دهد که انتشار با شماره دنباله seqno یا acked یا nacked بود. اگر ما به نوعی نقشه برداری بین آن داشتیم seqno و فرایند اصلی که به عنوان پاسخ تماس با ما در Genserver نامیده می شود ، ما می توانیم با یک پیام موفقیت یا خطایی به آن پاسخ دهیم.

خوشبختانه ، AMQP با ما فراهم می کند AMQP.Confirm.next_publish_seqno تا بتوانیم یک دسته را بدست آوریم seqno در همان مقطع زمانی که ما یک دسته داریم from (فرایندی که در وهله اول ما را خوانده است) ، به ما این امکان را می دهد تا نقشه ای را بین آنها ذخیره کنیم.

صحبت های سطح بالا به اندازه کافی. بیایید از یک طرح کلی راه حل قدم برداریم. بیایید دیگ بخار را از راه دور کنیم:

defmodule MyPublisher do
  use GenServer

  ...

  def start_link(channel) do
    # Start the process
    GenServer.start_link(__MODULE__, channel, name: __MODULE__)
  end

  def init(channel) do
    # Initialize the process state and do setup

    # Enable publisher confirms
    AMQP.Confirm.select(channel)

    # Register the current process as the async handler
    # for acks and nacks
    AMQP.Confirm.register_handler(channel, self())

    ...

    # Put the channel in the state. Proper
    # connection and channel management is
    # not covered in this post.
    {:ok, %{channel: channel}}
  end

  ...
end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

اکنون ما API را برای انتشار پیام ها اضافه خواهیم کرد:

  def publish(topic, payload) do
    # This function runs in the calling process
    GenServer.call(__MODULE__, {:publish, topic, payload})
  end

  def handle_call({:publish, topic, payload}, from, state) do
    # This function runs in the GenServer
    ...

    # Perform the actual publishing (ensure it is persistent)
    AMQP.Basic.publish(state.channel, "pub_sub", topic, payload, persistent: true)

    # Note that we are NOT replying here
    {:noreply, state}
  end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

و با ثبت نام GenServer به عنوان کنترل کننده پیام های تأیید ، ما باید برخی از تماس های تماس تلفنی را اجرا کنیم:

  def handle_info({:basic_ack, seqno, multiple}, state) do
    # The broker is saying that the publish with `seqno`
    # has been received. If `multiple` is true, all messages
    # up to `seqno` have been received.
    confirm_messages(seqno, multiple, fn from ->
      # For each of the received messages,
      # we can reply with `:ok`.
      GenServer.reply(from, :ok)
    end)

    {:noreply, state}
  end

  def handle_info({:basic_nack, seqno, multiple}, state) do
    # The broker is saying that the publish with `seqno`
    # has been lost. If `multiple` is true, all messages
    # up to `seqno` have been lost.
    confirm_messages(seqno, multiple, fn from ->
      # For each of the lost messages,
      # we can reply with `{:error, :nack}`.
      GenServer.reply(from, {:error, :nack})
    end)

    {:noreply, state}
  end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

ما بعد باید اجرای آن را درک کنیم confirm_messages، اما ابتدا ، برخی از به روزرسانی های موجود در BoilerPlate و کد انتشار:

def init(channel) do
    ...

    # Create an ETS table in which the elements are ordered. It
    # will be owned by the GenServer process. We make it private
    # because there is no need for other processes to read or
    # write to it.
    :ets.new(@table_name, [:ordered_set, :private, :named_table])

    {:ok, %{channel: channel}}
  end

  ...

  def handle_call({:publish, topic, payload}, from, state) do
    # Before we publish, get a hold of the next publish
    # sequence number `seqno`.
    seqno = AMQP.Confirm.next_publish_seqno(state.channel)

    # Store the caller reference (`from`) against
    # the `seqno` in ETS
    :ets.insert(@table_name, {seqno, from})

    # Perform the actual publishing
    ...
  end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

و اکنون سرانجام ، هنگام دریافت یک ACK یا NACK ، می توانیم جستجو کنیم
مرجع تماس گیرنده مربوطه توسط seqno و پاسخ مناسب:

  defp confirm_messages(seqno, _multiple = false, reply_fun) do
    # Lookup and remove the entries for seqno
    case :ets.take(@table_name, seqno) do
      [{^seqno, from}] -> reply_fun.(from)
      _ -> :ok
    end
  end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

موردی که کارگزار تصمیم می گیرد چندین پیام را تصدیق کند
به یکباره تا حدودی کمتر قابل خواندن است و شامل مشخصات مسابقه ای خاص ETS است ، اما می توانید تا حد زیادی از آن براق کنید:

  defp confirm_messages(seqno, true = _multiple, reply_fun) do
    :ets.select(@table_name, [
      {
        # bind `seqno` and `from` to $1 and $2
        {:"$1", :"$2"},
        # match entries where the bound seqno is =< to the incoming seqno 
        [{:"=<", :"$1", seqno}],
        # and return all the matching entries as tuples of $1 and $2
        [{{:"$1", :"$2"}}]}
    ])
    |> Enum.each(fn {key, from} ->
      reply_fun.(from)
      :ets.delete(@table_name, key)
    end)
  end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

طرح کامل

در اینجا ماژول به طور کامل است. اما به خاطر داشته باشید که ما از نظر اتصالات یا کانال های در حال مرگ ، هیچ تحمل گسل را شامل نمی شویم.

defmodule MyPublisher do
  use GenServer

  @table_name :pending_confirms

  def start_link(channel) do
    GenServer.start_link(__MODULE__, channel, name: __MODULE__)
  end

  def init(channel) do
    AMQP.Confirm.select(channel)
    AMQP.Confirm.register_handler(channel, self())

    :ets.new(@table_name, [:ordered_set, :private, :named_table])

    {:ok, %{channel: channel}}
  end

  def publish(topic, payload) do
    GenServer.call(__MODULE__, {:publish, topic, payload})
  end

  def handle_call({:publish, topic, payload}, from, state) do
    seqno = AMQP.Confirm.next_publish_seqno(state.channel)

    # Store the caller reference in ETS
    :ets.insert(@table_name, {seqno, from})

    # Perform the actual publishing
    AMQP.Basic.publish(state.channel, "pub_sub", topic, payload, persistent: true)

    {:noreply, state}
  end

  def handle_info({:basic_ack, seqno, multiple}, state) do
    confirm_messages(seqno, multiple, fn from ->
      GenServer.reply(from, :ok)
    end)

    {:noreply, state}
  end

  def handle_info({:basic_nack, seqno, multiple}, state) do
    confirm_messages(seqno, multiple, fn from ->
      GenServer.reply(from, {:error, :nack})
    end)

    {:noreply, state}
  end

  defp confirm_messages(seqno, true = _multiple, reply_fun) do
    :ets.select(@table_name, [
      {{:"$1", :"$2"}, [{:"=<", :"$1", seqno}], [{{:"$1", :"$2"}}]}
    ])
    |> Enum.each(fn {key, from} ->
      reply_fun.(from)
      :ets.delete(@table_name, key)
    end)
  end

  defp confirm_messages(seqno, false, reply_fun) do
    case :ets.take(@table_name, seqno) do
      [{^seqno, from}] -> reply_fun.(from)
      _ -> :ok
    end
  end
end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

پایان

ما یک راه حل برای تأیید ناشر کارآمد را تشریح کرده ایم. این امر باعث می شود روند فراخوانی (موردی که از `MyPublisher.Publish/2) فراخوانی می کند ، به طور مشخص بدانید که آیا یک پیام منتشر شده با موفقیت آن را به کارگزار تبدیل کرده است یا خیر.

راه حل کارآمد است ، زیرا حتی اگر تماس MyPublisher.publish/2 است ، همگام، در حالی که منتظر تأیید کارگزار هستید ، تماس های یکسان را از طریق سایر فرآیندهای دیگر مسدود نمی کند. پردازش تأیید است غیر همزمانبشر

قدرت این راه حل در این واقعیت ممکن است که همزمان باشد نهفته است GenServer تماس تلفنی (مانند handle_call) نیازی به پاسخ سریع نیست. تا زمانی که پیگیری کند که چه کسی را صدا می کند ، همیشه می تواند در مرحله بعدی پاسخ دهد (در ذهن داشته باشید) ، پس از اتمام کار Async.

سرانجام ، اگر نیازی به پشتیبانی از توان بسیار بالایی ندارید ، اما هنوز هم می خواهید از ناشر Async استفاده کنید که دست زدن به آن را تأیید کنید ، می توانید به جای یک جدول ETS از یک نقشه قدیمی ساده استفاده کنید تا مطابقت داشته باشد seqno به fromبشر

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا