Back off on reconnect rate

This commit is contained in:
Chris Marsh 2017-07-18 11:10:39 -07:00
parent 7082a13d49
commit b947e6afe5
2 changed files with 59 additions and 1 deletions

41
src/backoff.h Normal file
View file

@ -0,0 +1,41 @@
#pragma once
#include <stdint.h>
#include <algorithm>
#include <random>
struct Backoff
{
int64_t minAmount;
int64_t maxAmount;
int64_t current;
int fails;
std::mt19937_64 randGenerator;
std::uniform_real_distribution<> randDistribution;
double rand01() {
return randDistribution(randGenerator);
}
Backoff(int64_t min, int64_t max)
: minAmount(min)
, maxAmount(max)
, current(min)
, fails(0)
{}
void reset()
{
fails = 0;
current = minAmount;
}
int64_t nextDelay()
{
++fails;
int64_t delay = (int64_t)((double)current * 2.0 * rand01());
current = std::min(current + delay, maxAmount);
return current;
}
};

View file

@ -2,13 +2,18 @@
#include "rpc_connection.h" #include "rpc_connection.h"
#include "yolojson.h" #include "yolojson.h"
#include "backoff.h"
#include "rapidjson/document.h" #include "rapidjson/document.h"
#include <stdio.h> #include <stdio.h>
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#ifndef DISCORD_DISABLE_IO_THREAD
#include <condition_variable> #include <condition_variable>
#include <thread> #include <thread>
#endif
constexpr size_t MaxMessageSize = 16 * 1024; constexpr size_t MaxMessageSize = 16 * 1024;
constexpr size_t MessageQueueSize = 8; constexpr size_t MessageQueueSize = 8;
@ -29,6 +34,8 @@ static QueuedMessage SendQueue[MessageQueueSize]{};
static std::atomic_uint SendQueueNextAdd{0}; static std::atomic_uint SendQueueNextAdd{0};
static std::atomic_uint SendQueueNextSend{0}; static std::atomic_uint SendQueueNextSend{0};
static std::atomic_uint SendQueuePendingSends{0}; static std::atomic_uint SendQueuePendingSends{0};
static Backoff ReconnectTimeMs(500, 60 * 1000);
static auto NextConnect{std::chrono::system_clock::now()};
#ifndef DISCORD_DISABLE_IO_THREAD #ifndef DISCORD_DISABLE_IO_THREAD
static std::atomic_bool KeepRunning{ true }; static std::atomic_bool KeepRunning{ true };
@ -37,6 +44,11 @@ static std::condition_variable WaitForIOActivity;
static std::thread IoThread; static std::thread IoThread;
#endif // DISCORD_DISABLE_IO_THREAD #endif // DISCORD_DISABLE_IO_THREAD
static void UpdateReconnectTime()
{
NextConnect = std::chrono::system_clock::now() + std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()};
}
static QueuedMessage* SendQueueGetNextAddMessage() { static QueuedMessage* SendQueueGetNextAddMessage() {
// if we are falling behind, bail // if we are falling behind, bail
if (SendQueuePendingSends.load() >= MessageQueueSize) { if (SendQueuePendingSends.load() >= MessageQueueSize) {
@ -56,8 +68,11 @@ static void SendQueueCommitMessage() {
extern "C" void Discord_UpdateConnection() extern "C" void Discord_UpdateConnection()
{ {
if (!Connection->IsOpen()) { if (!Connection->IsOpen()) {
if (std::chrono::system_clock::now() >= NextConnect) {
UpdateReconnectTime();
Connection->Open(); Connection->Open();
} }
}
else { else {
// reads // reads
rapidjson::Document message; rapidjson::Document message;
@ -108,11 +123,13 @@ extern "C" void Discord_Initialize(const char* applicationId, DiscordEventHandle
Connection = RpcConnection::Create(applicationId); Connection = RpcConnection::Create(applicationId);
Connection->onConnect = []() { Connection->onConnect = []() {
WasJustConnected.exchange(true); WasJustConnected.exchange(true);
ReconnectTimeMs.reset();
}; };
Connection->onDisconnect = [](int err, const char* message) { Connection->onDisconnect = [](int err, const char* message) {
LastErrorCode = err; LastErrorCode = err;
StringCopy(LastErrorMessage, message, sizeof(LastErrorMessage)); StringCopy(LastErrorMessage, message, sizeof(LastErrorMessage));
WasJustDisconnected.exchange(true); WasJustDisconnected.exchange(true);
UpdateReconnectTime();
}; };
#ifndef DISCORD_DISABLE_IO_THREAD #ifndef DISCORD_DISABLE_IO_THREAD