Rate-limiting external API requests

mix new batch_job --sup

Add the req library to the mix.exs dependencies:

{:req, git: "https://github.com/wojtekmach/req.git"}

and run mix deps.get to install.

Setup the rate limiting module in lib/batch_job/rate_limiter.ex:

defmodule BatchJob.RateLimiter do
  use GenServer

  require Logger

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    state = %{
      request_queue: :queue.new(),
      request_queue_size: 0,
      request_queue_poll_rate:
        calculate_refresh_rate(opts.timeframe_max_requests, opts.timeframe, opts.timeframe_units),
      send_after_ref: nil
    }

    {:ok, state, {:continue, :initial_timer}}
  end

  # ---- Client facing function ----

  def queue_request(request_handler, response_handler) do
    GenServer.cast(__MODULE__, {:enqueue_request, request_handler, response_handler})
  end

  def next_request(request_handler, response_handler) do
    GenServer.cast(__MODULE__, {:enqueue_front_request, request_handler, response_handler})
  end

  # ---- Server Callbacks ----

  @impl true
  def handle_continue(:initial_timer, state) do
    {:noreply, %{state | send_after_ref: schedule_timer(state.request_queue_poll_rate)}}
  end

  @impl true
  def handle_cast({:enqueue_request, request_handler, response_handler}, state) do
    updated_queue = :queue.in({request_handler, response_handler}, state.request_queue)
    new_queue_size = state.request_queue_size + 1

    {:noreply, %{state | request_queue: updated_queue, request_queue_size: new_queue_size}}
  end
  @impl true

  def handle_cast({:enqueue_front_request, request_handler, response_handler}, state) do
    updated_queue = :queue.in_r({request_handler, response_handler}, state.request_queue)
    new_queue_size = state.request_queue_size + 1

    {:noreply, %{state | request_queue: updated_queue, request_queue_size: new_queue_size}}
  end

  @impl true
  def handle_info(:pop_from_request_queue, %{request_queue_size: 0} = state) do
    # No work to do as the queue size is zero...schedule the next timer
    {:noreply, %{state | send_after_ref: schedule_timer(state.request_queue_poll_rate)}}
  end

  @impl true
  def handle_info(:pop_from_request_queue, state) do
    {{:value, {request_handler, response_handler}}, new_request_queue} = :queue.out(state.request_queue)
    start_message = "Request started #{NaiveDateTime.utc_now()}"

    Task.Supervisor.async_nolink(BatchJob.TaskSupervisor, fn ->
      {req_module, req_function, req_args} = request_handler
      {resp_module, resp_function} = response_handler

      response = apply(req_module, req_function, req_args)
      apply(resp_module, resp_function, [response])

      Logger.info("#{start_message}\nRequest completed #{NaiveDateTime.utc_now()}")
    end)

    {:noreply,
     %{
       state
       | request_queue: new_request_queue,
         send_after_ref: schedule_timer(state.request_queue_poll_rate),
         request_queue_size: state.request_queue_size - 1
     }}
  end

  @impl true
  def handle_info({ref, _result}, state) do
    Process.demonitor(ref, [:flush])

    {:noreply, state}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
    {:noreply, state}
  end

  def get_requests_per_timeframe, do: get_rate_limiter_config(:timeframe_max_requests)
  def get_timeframe_unit, do: get_rate_limiter_config(:timeframe_units)
  def get_timeframe, do: get_rate_limiter_config(:timeframe)

  defp calculate_refresh_rate(num_requests, time, timeframe_units) do
    floor(convert_time_to_milliseconds(timeframe_units, time) / num_requests)
  end

  defp convert_time_to_milliseconds(:hours, time), do: :timer.hours(time)
  defp convert_time_to_milliseconds(:minutes, time), do: :timer.minutes(time)
  defp convert_time_to_milliseconds(:seconds, time), do: :timer.seconds(time)
  defp convert_time_to_milliseconds(:milliseconds, milliseconds), do: milliseconds

  defp schedule_timer(queue_poll_rate) do
    Process.send_after(self(), :pop_from_request_queue, queue_poll_rate)
  end

end

Setup the reqest module that will handle making the request to the external API:

defmodule BatchJob.Request do
  require Logger 

  @baseUrl "https://external-api-mock.com"

  def make_api_request(arg) do
    Req.build(:get, "#{@baseUrl}/api/endpoint/#{arg}", headers: [{"X-ApiKey", "api-key"}, {"Accept", "application/json"}],  finch: BatchJob.Finch)
    |> Req.add_default_steps(retry: true)
    |> Req.run!()
  end
end

The configuration for both the rate limiter and Finch request pools are supplied when launching the processes in application.ex:

def start(_type, _args) do
  children = [
    {Task.Supervisor, name: BatchJob.TaskSupervisor},
    {BatchJob.RateLimiter,
     %{
       timeframe_max_requests: 600,
       timeframe_units: :seconds,
       timeframe: 60
     }},
    {Finch,
      name: BatchJob.Finch,
      pools: %{
        :default => [size: 30, count: 1],
      }}
  ]

  opts = [strategy: :one_for_one, name: BatchJob.Supervisor]
  Supervisor.start_link(children, opts)
end

Finally the client facing API is created to allow queueing all the requests for the job, in lib/batch_job.ex:

defmodule BatchJob do
  require Logger

  alias BatchJob.{RateLimiter,Request}

  def enqueue_batch_tasks() do
    read_task_inputs()
    |> Enum.each(fn input -> enqueue_task(input) end)
  end

  def enqueue_task(input) do
    RateLimiter.queue_request({Request, :make_api_request, [input]}, {BatchJob, :log_result})
  end

  def log_result(%{status: status, body: body}) do
    {:ok, file} = File.open "results.log", [:append, {:delayed_write, 100, 20}]
    IO.puts(file, "#{status} - #{Jason.encode!(body)}")
    File.close file
  end

  def read_task_inputs() do
    File.read!("batch_inputs.txt")
    |> String.trim()
    |> String.split("\n")
  end
end

With this setup, the app can be started with iex (iex -S mix) and then all the requests can be queued up with BatchJob.enqueue_batch_tasks().