init repo.

This commit is contained in:
tqcq
2024-02-23 18:07:37 +08:00
commit 1a9e41d167
512 changed files with 191774 additions and 0 deletions

23
src/log/log.cc Normal file
View File

@@ -0,0 +1,23 @@
#include "sled/log/log.h"
#include <fmt/format.h>
#include <iostream>
namespace sled {
void
Log(LogLevel level,
const char *tag,
const char *fmt,
const char *file_name,
int line,
const char *func_name,
...)
{
int len = file_name ? strlen(file_name) : 0;
while (len > 0 && file_name[len - 1] != '/') { len--; }
auto msg = fmt::format("{}:{}@{} {} {}", file_name + len, line, func_name,
tag, fmt);
std::cout << msg << std::endl;
}
}// namespace sled

View File

@@ -0,0 +1,134 @@
#include "sled/network/async_resolver.h"
#include "sled/network/async_resolver_interface.h"
#include "sled/ref_counted_base.h"
#include "sled/synchronization/mutex.h"
#include "sled/task_queue/task_queue_base.h"
#include <thread>
namespace sled {
struct AsyncResolver::State : public RefCountedBase {
Mutex mutex;
enum class Status {
kLive,
kDead,
} status = Status::kLive;
};
int
ResolveHostname(const std::string &hostname,
int family,
std::vector<IPAddress> *addresses)
{
if (!addresses) { return -1; }
addresses->clear();
struct addrinfo *result = nullptr;
struct addrinfo hints = {0};
hints.ai_family = family;
hints.ai_flags = AI_ADDRCONFIG;
int ret = getaddrinfo(hostname.c_str(), nullptr, &hints, &result);
if (ret != 0) { return ret; }
struct addrinfo *cursor = result;
for (; cursor; cursor = cursor->ai_next) {
if (family == AF_UNSPEC || cursor->ai_family == family) {
IPAddress ip;
if (IPFromAddrInfo(cursor, &ip)) { addresses->push_back(ip); }
}
}
freeaddrinfo(result);
return 0;
}
AsyncResolver::AsyncResolver() : error_(-1), state_(new State) {}
AsyncResolver::~AsyncResolver()
{
MutexLock lock(&state_->mutex);
;
state_->status = State::Status::kDead;
}
void
AsyncResolver::Start(const SocketAddress &addr)
{
Start(addr, addr.family());
}
void
AsyncResolver::Start(const SocketAddress &addr, int family)
{
addr_ = addr;
auto caller_task_queue = TaskQueueBase::Current();
auto state = state_;
auto thread_function = [this, addr, family, caller_task_queue, state] {
std::vector<IPAddress> addresses;
int error = ResolveHostname(addr.hostname(), family, &addresses);
caller_task_queue->PostTask([this, error, &addresses, state] {
bool live;
{
MutexLock lock(&state->mutex);
live = state->status == State::Status::kLive;
}
if (live) { ResolveDone(std::move(addresses), error); }
});
};
// TODO: Add new thread run function
std::thread(thread_function).detach();
}
bool
AsyncResolver::GetResolvedAddress(int family, SocketAddress *addr) const
{
if (error_ != 0 || addresses_.empty()) { return false; }
*addr = addr_;
for (size_t i = 0; i < addresses_.size(); ++i) {
if (family == addresses_[i].family()) {
addr->SetResolvedIP(addresses_[i]);
return true;
}
}
return false;
}
int
AsyncResolver::GetError() const
{
return error_;
}
void
AsyncResolver::Destroy(bool wait)
{
destroy_called_ = true;
MaybeSelfDestruct();
}
const std::vector<IPAddress> &
AsyncResolver::addresses() const
{
return addresses_;
}
void
AsyncResolver::ResolveDone(std::vector<IPAddress> addresses, int error)
{
addresses_ = addresses;
error_ = error;
recursion_check_ = true;
SignalDone(this);
MaybeSelfDestruct();
}
void
AsyncResolver::MaybeSelfDestruct()
{
if (!recursion_check_) {
delete this;
} else {
recursion_check_ = false;
}
}
}// namespace sled

289
src/network/ip_address.cc Normal file
View File

@@ -0,0 +1,289 @@
#include "sled/network/ip_address.h"
#include <netinet/in.h>
namespace sled {
static const in6_addr kV4MappedPrefix = {
{{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0}}};
static const in6_addr kPrivateNetworkPrefix = {{{0xFD}}};
bool
IPAddress::operator==(const IPAddress &other) const
{
if (family_ != other.family_) { return false; }
if (family_ == AF_INET) {
return ::memcmp(&u_.ip4, &other.u_.ip4, sizeof(u_.ip4)) == 0;
}
if (family_ == AF_INET6) {
return ::memcmp(&u_.ip6, &other.u_.ip6, sizeof(u_.ip6)) == 0;
}
return family_ == AF_UNSPEC;
}
bool
IPAddress::operator!=(const IPAddress &other) const
{
return !((*this) == other);
}
bool
IPAddress::operator<(const IPAddress &other) const
{
if (family_ != other.family_) {
if (family_ == AF_UNSPEC) { return true; }
if (family_ == AF_INET && other.family_ == AF_INET6) { return true; }
return false;
}
switch (family_) {
case AF_INET: {
return NetworkToHost32(u_.ip4.s_addr)
< NetworkToHost32(other.u_.ip4.s_addr);
}
case AF_INET6: {
return ::memcmp(&u_.ip6, &other.u_.ip6, sizeof(u_.ip6)) < 0;
}
return ::memcmp(&u_.ip6.s6_addr, &other.u_.ip6.s6_addr, 16) < 0;
}
return false;
}
int
IPAddress::overhead() const
{
switch (family_) {
case AF_INET:
return 20;
case AF_INET6:
return 40;
default:
return 0;
}
}
uint32_t
IPAddress::v4AddressAsHostOrderInteger() const
{
if (family_ == AF_INET) {
return NetworkToHost32(u_.ip4.s_addr);
} else {
return 0;
}
}
bool
IPAddress::IsNil() const
{
return IPIsUnspec(*this);
}
size_t
IPAddress::Size() const
{
switch (family_) {
case AF_INET:
return sizeof(in_addr);
case AF_INET6:
return sizeof(in6_addr);
default:
return 0;
}
}
std::string
IPAddress::ToString() const
{
if (family_ != AF_INET && family_ != AF_INET6) { return std::string(); }
char buf[INET6_ADDRSTRLEN] = {0};
const void *src = &u_.ip4;
if (family_ == AF_INET6) { src = &u_.ip6; }
if (::inet_ntop(family_, src, buf, sizeof(buf)) == 0) {
return std::string();
}
return std::string(buf);
}
in_addr
IPAddress::ipv4_address() const
{
return u_.ip4;
}
in6_addr
IPAddress::ipv6_address() const
{
return u_.ip6;
}
bool
IPIsHelper(const IPAddress &ip, const in6_addr &tomatch, int length)
{
in6_addr addr = ip.ipv6_address();
return ::memcmp(&addr, &tomatch, (length >> 3)) == 0;
}
bool
IPFromAddrInfo(struct addrinfo *info, IPAddress *out)
{
if (!info || !info->ai_addr) { return false; }
if (info->ai_addr->sa_family == AF_INET) {
// ipv4
sockaddr_in *addr = reinterpret_cast<sockaddr_in *>(info->ai_addr);
*out = IPAddress(addr->sin_addr);
return true;
} else if (info->ai_addr->sa_family == AF_INET6) {
// ipv6
sockaddr_in6 *addr = reinterpret_cast<sockaddr_in6 *>(info->ai_addr);
*out = IPAddress(addr->sin6_addr);
return true;
}
return false;
}
bool
IPFromString(const std::string &str, IPAddress *out)
{
if (!out) { return false; }
in_addr addr;
if (::inet_pton(AF_INET, str.c_str(), &addr) == 0) {
in6_addr addr6;
if (::inet_pton(AF_INET6, str.c_str(), &addr6) == 0) {
*out = IPAddress();
return false;
}
*out = IPAddress(addr6);
} else {
// RETURN VALUE is -1(invalid family) or 1(success)
*out = IPAddress(addr);
}
return true;
}
bool
IPIsAny(const IPAddress &ip)
{
switch (ip.family()) {
case AF_INET:
return ip == IPAddress(INADDR_ANY);
case AF_INET6:
return ip == IPAddress(in6addr_any) || ip == IPAddress(kV4MappedPrefix);
case AF_UNSPEC:
return false;
}
return false;
}
static bool
IPIsLinkLocalV4(const IPAddress &ip)
{
uint32_t ip_in_host_order = ip.v4AddressAsHostOrderInteger();
return ((ip_in_host_order >> 16) == ((169 << 8) | 254));
}
static bool
IPIsLinkLocalV6(const IPAddress &ip)
{
in6_addr addr = ip.ipv6_address();
return (addr.s6_addr[0] == 0xFE && (addr.s6_addr[1] & 0xC0) == 0x80);
}
bool
IPIsLinkLocal(const IPAddress &ip)
{
switch (ip.family()) {
case AF_INET:
return IPIsLinkLocalV4(ip);
case AF_INET6:
return IPIsLinkLocalV6(ip);
}
return false;
}
static bool
IPIsPrivateNetworkV4(const IPAddress &ip)
{
uint32_t ip_in_host_order = ip.v4AddressAsHostOrderInteger();
return ((ip_in_host_order >> 24) == 10)
|| ((ip_in_host_order >> 20) == ((172 << 4) | 1))
|| ((ip_in_host_order >> 16) == ((192 << 8) | 168));
}
static bool
IPIsPrivateNetworkV6(const IPAddress &ip)
{
return IPIsHelper(ip, kPrivateNetworkPrefix, 8);
}
// like "192.168.111.222"
bool
IPIsPrivateNetwork(const IPAddress &ip)
{
switch (ip.family()) {
case AF_INET:
return IPIsPrivateNetworkV4(ip);
case AF_INET6:
return IPIsPrivateNetworkV6(ip);
}
return false;
}
static bool
IPIsSharedNetworkV4(const IPAddress &ip)
{
uint32_t ip_in_host_order = ip.v4AddressAsHostOrderInteger();
return (ip_in_host_order >> 22) == ((100 << 2) | 1);
}
// like "100.72.16.122"
bool
IPIsSharedNetwork(const IPAddress &ip)
{
if (ip.family() == AF_INET) { return IPIsSharedNetworkV4(ip); }
return false;
}
static bool
IPIsLoopbackV4(const IPAddress &ip)
{
uint32_t ip_in_host_order = ip.v4AddressAsHostOrderInteger();
return ((ip_in_host_order >> 24) == 127);
}
static bool
IPIsLoopbackV6(const IPAddress &ip)
{
return ip == IPAddress(in6addr_loopback);
}
bool
IPIsLoopback(const IPAddress &ip)
{
switch (ip.family()) {
case AF_INET:
return IPIsLoopbackV4(ip);
case AF_INET6:
return IPIsLoopbackV6(ip);
}
return false;
}
bool
IPIsPrivate(const IPAddress &ip)
{
return IPIsLinkLocal(ip) || IPIsLoopback(ip) || IPIsPrivateNetwork(ip)
|| IPIsSharedNetwork(ip);
}
bool
IPIsUnspec(const IPAddress &ip)
{
return ip.family() == AF_UNSPEC;
}
}// namespace sled

View File

@@ -0,0 +1,27 @@
#include "sled/network/null_socket_server.h"
namespace sled {
NullSocketServer::NullSocketServer() = default;
NullSocketServer::~NullSocketServer() {}
bool
NullSocketServer::Wait(TimeDelta max_wait_duration, bool process_io)
{
event_.Wait(max_wait_duration);
return true;
}
void
NullSocketServer::WakeUp()
{
event_.Set();
}
Socket *
NullSocketServer::CreateSocket(int family, int type)
{
return nullptr;
}
}// namespace sled

View File

@@ -0,0 +1,827 @@
#include "sled/network/physical_socket_server.h"
#include "sled/network/async_resolver.h"
#include "sled/network/socket.h"
#include "sled/synchronization/event.h"
#include "sled/synchronization/mutex.h"
#include "sled/time_utils.h"
#include <array>
#include <errno.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <pthread.h>
#include <sys/ioctl.h>
#include <unistd.h>
namespace {
class ScopedSetTrue {
public:
ScopedSetTrue(bool *value) : value_(value) { *value_ = true; }
~ScopedSetTrue() { *value_ = false; }
private:
bool *value_;
};
}// namespace
namespace sled {
class Signaler : public Dispatcher {
public:
Signaler(PhysicalSocketServer *ss, bool &flag_to_clear)
: ss_(ss),
afd_([] {
std::array<int, 2> afd = {INVALID_SOCKET, INVALID_SOCKET};
if (pipe(afd.data()) < 0) {
// TODO: add log here
}
return afd;
}()),
fSignaled_(false),
flag_to_clear_(flag_to_clear)
{
ss_->Add(this);
}
~Signaler() override
{
ss_->Remove(this);
close(afd_[0]);
close(afd_[1]);
}
virtual void Signal()
{
MutexLock lock(&mutex_);
if (!fSignaled_) {
const uint8_t b[1] = {0};
const ssize_t res = write(afd_[1], b, sizeof(b));
fSignaled_ = true;
}
}
uint32_t GetRequestedEvents() override { return DE_READ; }
void OnEvent(uint32_t ff, int err) override
{
MutexLock lock(&mutex_);
if (fSignaled_) {
uint8_t b[4];
const ssize_t res = read(afd_[0], b, sizeof(b));
fSignaled_ = false;
}
flag_to_clear_ = false;
}
int GetDescriptor() override { return afd_[0]; }
bool IsDescriptorClosed() override { return false; }
private:
PhysicalSocketServer *const ss_;
const std::array<int, 2> afd_;
bool fSignaled_;
Mutex mutex_;
bool &flag_to_clear_;
};
PhysicalSocketServer::PhysicalSocketServer() : fWait_(false)
{
signal_wakeup_ = new Signaler(this, fWait_);
}
PhysicalSocketServer::~PhysicalSocketServer() { delete signal_wakeup_; }
void
PhysicalSocketServer::WakeUp()
{
signal_wakeup_->Signal();
}
Socket *
PhysicalSocketServer::CreateSocket(int family, int type)
{
SocketDispatcher *dispatcher = new SocketDispatcher(this);
if (dispatcher->Create(family, type)) {
return dispatcher;
} else {
delete dispatcher;
return nullptr;
}
}
Socket *
PhysicalSocketServer::WrapSocket(SOCKET s)
{
SocketDispatcher *dispatcher = new SocketDispatcher(s, this);
if (dispatcher->Initialize()) {
return dispatcher;
} else {
delete dispatcher;
return nullptr;
}
}
void
PhysicalSocketServer::Add(Dispatcher *pdispatcher)
{
RecursiveMutexLock lock(&lock_);
if (key_by_dispatcher_.count(pdispatcher)) { return; }
uint64_t key = next_dispatcher_key_++;
dispatcher_by_key_.emplace(key, pdispatcher);
key_by_dispatcher_.emplace(pdispatcher, key);
}
void
PhysicalSocketServer::Remove(Dispatcher *pdispatcher)
{
RecursiveMutexLock lock(&lock_);
if (!key_by_dispatcher_.count(pdispatcher)) { return; }
uint64_t key = key_by_dispatcher_.at(pdispatcher);
key_by_dispatcher_.erase(pdispatcher);
dispatcher_by_key_.erase(key);
}
void
PhysicalSocketServer::Update(Dispatcher *pdispatcher)
{
return;
/*
MutexLock lock(&lock_);
if (!key_by_dispatcher_.count(pdispatcher)) { return; }
UpdateEpoll();
*/
}
int
PhysicalSocketServer::ToCmsWait(TimeDelta max_wait_duration)
{
return max_wait_duration == Event::kForever
? kForeverMs
: max_wait_duration.RoundUpTo(TimeDelta::Millis(1)).ms();
}
bool
PhysicalSocketServer::Wait(TimeDelta max_wait_duration, bool process_io)
{
ScopedSetTrue s(&waiting_);
const int cmsWait = ToCmsWait(max_wait_duration);
return WaitSelect(cmsWait, process_io);
}
static void
ProcessEvents(Dispatcher *pdispatcher,
bool readable,
bool writable,
bool error_event,
bool check_error)
{
int errcode = 0;
if (check_error) {
socklen_t len = sizeof(errcode);
int res = ::getsockopt(pdispatcher->GetDescriptor(), SOL_SOCKET,
SO_ERROR, &errcode, &len);
if (res < 0) {
if (error_event || errno != ENOTSOCK) { errcode = EBADF; }
}
}
const uint32_t requested_events = pdispatcher->GetRequestedEvents();
uint32_t ff = 0;
if (readable) {
if (errcode || pdispatcher->IsDescriptorClosed()) {
ff |= DE_CLOSE;
} else if (requested_events & DE_ACCEPT) {
ff |= DE_ACCEPT;
} else {
ff |= DE_READ;
}
}
if (writable) {
if (requested_events & DE_CONNECT) {
if (!errcode) { ff |= DE_CONNECT; }
} else {
ff |= DE_WRITE;
}
}
if (errcode) { ff |= DE_CLOSE; }
if (ff != 0) { pdispatcher->OnEvent(ff, errcode); }
}
bool
PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io)
{
struct timeval *ptvWait = nullptr;
struct timeval tvWait;
int64_t stop_us;
if (cmsWait != kForeverMs) {
tvWait.tv_sec = cmsWait / 1000;
tvWait.tv_usec = (cmsWait % 1000) * 1000;
ptvWait = &tvWait;
stop_us = TimeMicros() + cmsWait * 1000;
}
fd_set fdsRead;
fd_set fdsWrite;
fWait_ = true;
while (fWait_) {
FD_ZERO(&fdsRead);
FD_ZERO(&fdsWrite);
int fdmax = -1;
{
RecursiveMutexLock lock(&lock_);
current_dispatcher_keys_.clear();
for (auto const &kv : dispatcher_by_key_) {
uint64_t key = kv.first;
Dispatcher *pdispatcher = kv.second;
if (!process_io && (pdispatcher != signal_wakeup_)) {
continue;
}
current_dispatcher_keys_.push_back(key);
int fd = pdispatcher->GetDescriptor();
if (fd > fdmax) { fdmax = fd; }
uint32_t ff = pdispatcher->GetRequestedEvents();
if (ff & (DE_READ | DE_ACCEPT)) { FD_SET(fd, &fdsRead); }
if (ff & (DE_WRITE | DE_CONNECT)) { FD_SET(fd, &fdsWrite); }
}
}
int n = select(fdmax + 1, &fdsRead, &fdsWrite, nullptr, ptvWait);
if (n < 0) {
if (errno != EINTR) { return false; }
} else if (n == 0) {
return true;
} else {
RecursiveMutexLock lock(&lock_);
for (uint64_t key : current_dispatcher_keys_) {
// skip if the dispatcher is removed
if (!dispatcher_by_key_.count(key)) { continue; }
Dispatcher *pdispatcher = dispatcher_by_key_.at(key);
int fd = pdispatcher->GetDescriptor();
bool readable = FD_ISSET(fd, &fdsRead);
if (readable) { FD_CLR(fd, &fdsRead); }
bool writable = FD_ISSET(fd, &fdsWrite);
if (writable) { FD_CLR(fd, &fdsWrite); }
ProcessEvents(pdispatcher, readable, writable, false,
readable || writable);
}
}
if (ptvWait) {
ptvWait->tv_sec = 0;
ptvWait->tv_usec = 0;
int64_t time_left_us = stop_us - TimeMicros();
if (time_left_us > 0) {
ptvWait->tv_sec = time_left_us / kNumMicrosecsPerSec;
ptvWait->tv_usec = time_left_us % kNumMicrosecsPerSec;
} else {
break;
}
}
}
return true;
}
PhysicalSocket::PhysicalSocket(PhysicalSocketServer *ss, SOCKET s)
: ss_(ss),
s_(s),
error_(0),
state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
resolver_(nullptr)
{
if (s != INVALID_SOCKET) {
SetEnabledEvents(DE_READ | DE_WRITE);
int type = SOCK_STREAM;
socklen_t len = sizeof(type);
const int res =
getsockopt(s_, SOL_SOCKET, SO_TYPE, (void *) &type, &len);
udp_ = (SOCK_DGRAM == type);
}
}
PhysicalSocket::~PhysicalSocket() { Close(); }
bool
PhysicalSocket::Create(int family, int type)
{
Close();
s_ = ::socket(family, type, 0);
udp_ = (SOCK_DGRAM == type);
family_ = family;
UpdateLastError();
if (udp_) { SetEnabledEvents(DE_READ | DE_WRITE); }
return s_ != INVALID_SOCKET;
}
SocketAddress
PhysicalSocket::GetLocalAddress() const
{
sockaddr_storage addr_storage = {};
socklen_t addrlen = sizeof(addr_storage);
sockaddr *addr = reinterpret_cast<sockaddr *>(&addr_storage);
int result = ::getsockname(s_, addr, &addrlen);
SocketAddress address;
if (result >= 0) {
} else {
}
return address;
}
SocketAddress
PhysicalSocket::GetRemoteAddress() const
{
sockaddr_storage addr_storage = {};
socklen_t addrlen = sizeof(addr_storage);
sockaddr *addr = reinterpret_cast<sockaddr *>(&addr_storage);
int result = ::getpeername(s_, addr, &addrlen);
SocketAddress address;
if (result >= 0) {
} else {
}
return address;
}
int
PhysicalSocket::Bind(const SocketAddress &bind_addr)
{
SocketAddress copied_bind_addr = bind_addr;
sockaddr_storage addr_storage;
size_t len = copied_bind_addr.ToSockAddrStorage(&addr_storage);
sockaddr *addr = reinterpret_cast<sockaddr *>(&addr_storage);
int err = ::bind(s_, addr, static_cast<int>(len));
UpdateLastError();
return err;
}
int
PhysicalSocket::Connect(const SocketAddress &addr)
{
if (state_ != CS_CLOSED) {
SetError(EALREADY);
return SOCKET_ERROR;
}
if (addr.IsUnresolvedIP()) {
resolver_ = new AsyncResolver();
resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
resolver_->Start(addr);
state_ = CS_CONNECTING;
return 0;
}
return DoConnect(addr);
}
int
PhysicalSocket::GetError() const
{
MutexLock lock(&mutex_);
return error_;
}
void
PhysicalSocket::SetError(int error)
{
// MutexLock lock(&mutex_);
LockGuard<Mutex> lock(&mutex_);
error_ = error;
}
int
PhysicalSocket::GetOption(Option opt, int *value)
{
int slevel;
int sopt;
if (TranslateOption(opt, &slevel, &sopt) == -1) { return -1; }
socklen_t optlen = sizeof(*value);
int ret = ::getsockopt(s_, slevel, sopt, (void *) value, &optlen);
if (ret == -1) { return -1; }
return ret;
}
int
PhysicalSocket::SetOption(Option opt, int value)
{
int slevel;
int sopt;
if (TranslateOption(opt, &slevel, &sopt) == -1) { return -1; }
int result = ::setsockopt(s_, slevel, sopt, (void *) &value, sizeof(value));
if (result != 0) { UpdateLastError(); }
return result;
}
int
PhysicalSocket::Send(const void *pv, size_t cb)
{
int sent = DoSend(s_, reinterpret_cast<const char *>(pv),
static_cast<int>(cb), MSG_NOSIGNAL);
UpdateLastError();
if ((sent > 0 && sent < static_cast<int>(cb))
|| (sent < 0 && IsBlockingError(GetError()))) {
EnableEvents(DE_WRITE);
}
return sent;
}
int
PhysicalSocket::SendTo(const void *buffer,
size_t length,
const SocketAddress &addr)
{
sockaddr_storage saddr;
size_t len = addr.ToSockAddrStorage(&saddr);
int sent =
DoSendTo(s_, static_cast<const char *>(buffer),
static_cast<int>(length), MSG_NOSIGNAL,
reinterpret_cast<sockaddr *>(&saddr), static_cast<int>(len));
UpdateLastError();
if ((sent > 0 && sent < static_cast<int>(length))
|| (sent < 0 && IsBlockingError(GetError()))) {
EnableEvents(DE_WRITE);
}
return sent;
}
int
PhysicalSocket::Recv(void *buffer, size_t length, int64_t *timestamp)
{
int received = DoReadFromSocket(buffer, length, nullptr, timestamp);
if ((received == 0) && (length != 0)) {
EnableEvents(DE_READ);
SetError(EWOULDBLOCK);
return SOCKET_ERROR;
}
UpdateLastError();
int error = GetError();
bool success = (received >= 0) || IsBlockingError(GetError());
if (udp_ || success) { EnableEvents(DE_READ); }
if (!success) {
// TODO: add log here
}
return received;
}
int
PhysicalSocket::RecvFrom(void *buffer,
size_t length,
SocketAddress *out_addr,
int64_t *timestamp)
{
int received = DoReadFromSocket(buffer, length, out_addr, timestamp);
UpdateLastError();
int error = GetError();
bool success = (received >= 0) || IsBlockingError(error);
if (udp_ || success) { EnableEvents(DE_READ); }
if (!success) {
// TODO: add log here
}
return received;
}
int64_t
GetSocketRecvTimestamp(int socket)
{
struct timeval tv_ioctl;
#if defined(SIOCGSTAMP_OLD)
int ret = ioctl(socket, SIOCGSTAMP_OLD, &tv_ioctl);
#elif defined(SIOCGSTAMP)
int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl);
#else
int ret = -1;
#endif
if (ret != 0) { return -1; }
int64_t timestamp =
static_cast<int64_t>(tv_ioctl.tv_sec) * kNumMicrosecsPerSec
+ tv_ioctl.tv_usec;
return timestamp;
}
int
PhysicalSocket::DoReadFromSocket(void *buffer,
size_t length,
SocketAddress *out_addr,
int64_t *timestamp)
{
sockaddr_storage addr_storage;
socklen_t addr_len = sizeof(addr_storage);
sockaddr *addr = reinterpret_cast<sockaddr *>(&addr_storage);
int received = 0;
if (out_addr) {
received = ::recvfrom(s_, static_cast<char *>(buffer),
static_cast<int>(length), 0, addr, &addr_len);
SocketAddressFromSockAddrStorage(addr_storage, out_addr);
} else {
received = ::recv(s_, static_cast<char *>(buffer),
static_cast<int>(length), 0);
}
if (timestamp) { *timestamp = GetSocketRecvTimestamp(s_); }
return received;
}
int
PhysicalSocket::Listen(int backlog)
{
int err = ::listen(s_, backlog);
UpdateLastError();
if (err == 0) {
state_ = CS_CONNECTING;
EnableEvents(DE_ACCEPT);
}
return err;
}
Socket *
PhysicalSocket::Accept(SocketAddress *out_addr)
{
EnableEvents(DE_ACCEPT);
sockaddr_storage addr_storage;
socklen_t addr_len = sizeof(addr_storage);
sockaddr *addr = reinterpret_cast<sockaddr *>(&addr_storage);
SOCKET s = DoAccept(s_, addr, &addr_len);
UpdateLastError();
if (s == INVALID_SOCKET) { return nullptr; }
if (out_addr) { SocketAddressFromSockAddrStorage(addr_storage, out_addr); }
return ss_->WrapSocket(s);
}
int
PhysicalSocket::Close()
{
if (s_ == INVALID_SOCKET) { return 0; }
int err = ::close(s_);
UpdateLastError();
s_ = INVALID_SOCKET;
state_ = CS_CLOSED;
SetEnabledEvents(0);
if (resolver_) {
resolver_->Destroy(false);
resolver_ = nullptr;
}
return err;
}
SOCKET
PhysicalSocket::DoAccept(SOCKET socket, sockaddr *addr, socklen_t *addrlen)
{
return ::accept(socket, addr, addrlen);
}
int
PhysicalSocket::DoSend(SOCKET socket, const char *buf, int len, int flags)
{
return ::send(socket, buf, len, flags);
}
int
PhysicalSocket::DoSendTo(SOCKET socket,
const char *buf,
int len,
int flags,
const struct sockaddr *dest_addr,
socklen_t addrlen)
{
return ::sendto(socket, buf, len, flags, dest_addr, addrlen);
}
int
PhysicalSocket::DoConnect(const SocketAddress &connect_addr)
{
if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) {
return SOCKET_ERROR;
}
sockaddr_storage addr_storage;
size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
sockaddr *addr = reinterpret_cast<sockaddr *>(&addr_storage);
int err = ::connect(s_, addr, static_cast<int>(len));
UpdateLastError();
uint8_t events = DE_READ | DE_WRITE;
if (err == 0) {
state_ = CS_CONNECTED;
} else if (IsBlockingError(GetError())) {
state_ = CS_CONNECTING;
events |= DE_CONNECT;
} else {
return SOCKET_ERROR;
}
EnableEvents(events);
return 0;
}
void
PhysicalSocket::OnResolveResult(AsyncResolverInterface *resolver)
{
if (resolver != resolver_) { return; }
int error = resolver_->GetError();
if (error == 0) {
error = DoConnect(resolver_->address());
} else {
// get error from resolver
Close();
SetError(error);
SignalCloseEvent(this, error);
}
}
void
PhysicalSocket::UpdateLastError()
{
SetError(errno);
}
void
PhysicalSocket::SetEnabledEvents(uint8_t events)
{
enabled_events_ = events;
}
void
PhysicalSocket::EnableEvents(uint8_t events)
{
enabled_events_ |= events;
}
void
PhysicalSocket::DisableEvents(uint8_t events)
{
enabled_events_ &= ~events;
}
int
PhysicalSocket::TranslateOption(Option opt, int *slevel, int *sopt)
{
switch (opt) {
case OPT_RCVBUF:
*slevel = SOL_SOCKET;
*sopt = SO_RCVBUF;
break;
case OPT_SNDBUF:
*slevel = SOL_SOCKET;
*sopt = SO_SNDBUF;
break;
case OPT_NODELAY:
*slevel = IPPROTO_TCP;
*sopt = TCP_NODELAY;
break;
default:
return -1;
}
return 0;
}
SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss)
: PhysicalSocket(ss)
{}
SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss)
: PhysicalSocket(ss, s)
{}
SocketDispatcher::~SocketDispatcher() { Close(); }
bool
SocketDispatcher::Initialize()
{
// must be a non-blocking
fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
ss_->Add(this);
return true;
}
bool
SocketDispatcher::Create(int type)
{
return Create(AF_INET, type);
}
bool
SocketDispatcher::Create(int family, int type)
{
if (!PhysicalSocket::Create(family, type)) { return false; }
if (!Initialize()) { return false; }
return true;
}
int
SocketDispatcher::GetDescriptor()
{
return s_;
}
bool
SocketDispatcher::IsDescriptorClosed()
{
if (udp_) { return s_ == INVALID_SOCKET; }
char ch;
ssize_t res;
do {
res = ::recv(s_, &ch, 1, MSG_PEEK);
} while (res < 0 && errno == EINTR);
if (res > 0) {
return false;
} else if (res == 0) {
return true;
} else {
switch (errno) {
case EBADF:
return true;
case ECONNRESET:
return true;
case ECONNABORTED:
return true;
case EPIPE:
return true;
case EWOULDBLOCK:
return false;
default:
// Assuming benign blocking error
return false;
}
}
}
uint32_t
SocketDispatcher::GetRequestedEvents()
{
return enabled_events();
}
void
SocketDispatcher::OnEvent(uint32_t ff, int err)
{
if ((ff & DE_CONNECT) != 0) { state_ = CS_CONNECTED; }
if ((ff & DE_CLOSE) != 0) { state_ = CS_CLOSED; }
if ((ff & DE_CONNECT) != 0) {
DisableEvents(DE_CONNECT);
SignalConnectEvent(this);
}
if ((ff & DE_ACCEPT) != 0) {
DisableEvents(DE_ACCEPT);
SignalReadEvent(this);
}
if ((ff & DE_READ) != 0) {
DisableEvents(DE_READ);
SignalReadEvent(this);
}
if ((ff & DE_WRITE) != 0) {
DisableEvents(DE_WRITE);
SignalWriteEvent(this);
}
if ((ff & DE_CLOSE) != 0) {
SetEnabledEvents(0);
SignalCloseEvent(this, err);
}
}
int
SocketDispatcher::Close()
{
if (s_ == INVALID_SOCKET) { return 0; }
ss_->Remove(this);
return PhysicalSocket::Close();
}
}// namespace sled

View File

@@ -0,0 +1,235 @@
#include "sled/network/socket_address.h"
#include "sled/network/ip_address.h"
namespace sled {
SocketAddress::SocketAddress() { Clear(); }
SocketAddress::SocketAddress(const std::string &hostname, int port)
{
SetIP(hostname);
SetPort(port);
}
SocketAddress::SocketAddress(uint32_t ip_as_host_order_integer, int port)
{
SetIP(IPAddress(ip_as_host_order_integer));
SetPort(port);
}
SocketAddress::SocketAddress(const IPAddress &ip, int port)
{
SetIP(ip);
SetPort(port);
}
SocketAddress::SocketAddress(const SocketAddress &addr)
{
this->operator=(addr);
}
void
SocketAddress::Clear()
{
hostname_.clear();
literal_ = false;
ip_ = IPAddress();
port_ = 0;
scope_id_ = 0;
}
bool
SocketAddress::IsNil() const
{
return hostname_.empty() && IPIsUnspec(ip_) && 0 == port_;
}
bool
SocketAddress::IsComplete() const
{
return (!IPIsAny(ip_)) && (0 != port_);
}
SocketAddress &
SocketAddress::operator=(const SocketAddress &addr)
{
hostname_ = addr.hostname_;
ip_ = addr.ip_;
port_ = addr.port_;
scope_id_ = addr.scope_id_;
literal_ = addr.literal_;
return *this;
}
void
SocketAddress::SetIP(uint32_t ip_as_host_order_integer)
{
hostname_.clear();
literal_ = false;
ip_ = IPAddress(ip_as_host_order_integer);
scope_id_ = 0;
}
void
SocketAddress::SetIP(const IPAddress &ip)
{
hostname_.clear();
literal_ = false;
ip_ = ip;
scope_id_ = 0;
}
void
SocketAddress::SetIP(const std::string &hostname)
{
hostname_ = hostname;
literal_ = IPFromString(hostname, &ip_);
if (!literal_) { ip_ = IPAddress(); }
scope_id_ = 0;
}
void
SocketAddress::SetResolvedIP(uint32_t ip_as_host_order_integer)
{
SetIP(ip_as_host_order_integer);
scope_id_ = 0;
}
void
SocketAddress::SetResolvedIP(const IPAddress &ip)
{
ip_ = ip;
scope_id_ = 0;
}
void
SocketAddress::SetPort(int port)
{
port_ = static_cast<uint16_t>(port);
}
uint32_t
SocketAddress::ip() const
{
return ip_.v4AddressAsHostOrderInteger();
}
const IPAddress &
SocketAddress::ipaddr() const
{
return ip_;
}
uint16_t
SocketAddress::port() const
{
return port_;
}
bool
SocketAddress::IsAnyIP() const
{
return IPIsAny(ip_);
}
bool
SocketAddress::IsLoopbackIP() const
{
return IPIsLoopback(ip_)
|| (IPIsAny(ip_) && 0 == strcmp(hostname_.c_str(), "localhost"));
}
bool
SocketAddress::IsPrivateIP() const
{
return IPIsPrivate(ip_);
}
bool
SocketAddress::IsUnresolvedIP() const
{
return IPIsUnspec(ip_) && !literal_ && !hostname_.empty();
}
void
SocketAddress::ToSockAddr(sockaddr_in *saddr) const
{
::memset(saddr, 0, sizeof(*saddr));
if (ip_.family() != AF_INET) {
saddr->sin_family = AF_UNSPEC;
return;
}
saddr->sin_family = AF_INET;
saddr->sin_port = HostToNetwork16(port_);
if (IPIsAny(ip_)) {
saddr->sin_addr.s_addr = INADDR_ANY;
} else {
saddr->sin_addr = ip_.ipv4_address();
}
}
bool
SocketAddress::FromSockAddr(const sockaddr_in &saddr)
{
if (saddr.sin_family != AF_INET) { return false; }
SetIP(NetworkToHost32(saddr.sin_addr.s_addr));
SetPort(NetworkToHost16(saddr.sin_port));
literal_ = false;
return true;
}
static size_t
ToSocketAddrStorageHelper(sockaddr_storage *addr,
const IPAddress &ip,
uint16_t port,
int scope_id)
{
::memset(addr, 0, sizeof(sockaddr_storage));
addr->ss_family = static_cast<sa_family_t>(ip.family());
if (addr->ss_family == AF_INET6) {
sockaddr_in6 *saddr = reinterpret_cast<sockaddr_in6 *>(addr);
saddr->sin6_addr = ip.ipv6_address();
saddr->sin6_port = HostToNetwork16(port);
saddr->sin6_scope_id = scope_id;
return sizeof(sockaddr_in6);
} else if (addr->ss_family == AF_INET) {
sockaddr_in *saddr = reinterpret_cast<sockaddr_in *>(addr);
saddr->sin_addr = ip.ipv4_address();
saddr->sin_port = HostToNetwork16(port);
return sizeof(sockaddr_in);
}
return 0;
}
size_t
SocketAddress::ToSockAddrStorage(sockaddr_storage *saddr) const
{
return ToSocketAddrStorageHelper(saddr, ip_, port_, scope_id_);
}
bool
SocketAddressFromSockAddrStorage(const sockaddr_storage &addr,
SocketAddress *out)
{
if (!out) { return false; }
if (addr.ss_family == AF_INET) {
const sockaddr_in *saddr = reinterpret_cast<const sockaddr_in *>(&addr);
*out = SocketAddress(IPAddress(saddr->sin_addr),
NetworkToHost16(saddr->sin_port));
return true;
} else if (addr.ss_family == AF_INET6) {
const sockaddr_in6 *saddr =
reinterpret_cast<const sockaddr_in6 *>(&addr);
*out = SocketAddress(IPAddress(saddr->sin6_addr),
NetworkToHost16(saddr->sin6_port));
return true;
}
return false;
}
}// namespace sled

View File

@@ -0,0 +1,16 @@
/**
* @author : {{NAME}} ({{EMAIL}})
* @file : {{FILE}}
* @created : {{TIMESTAMP}}
*/
#include "sled/network/socket_server.h"
#include "sled/network/physical_socket_server.h"
namespace sled {
std::unique_ptr<sled::SocketServer>
CreateDefaultSocketServer()
{
return std::unique_ptr<sled::SocketServer>(new PhysicalSocketServer());
}
}// namespace sled

68
src/operations_chain.cc Normal file
View File

@@ -0,0 +1,68 @@
#include "sled/operations_chain.h"
#include "sled/make_ref_counted.h"
#include "sled/optional.h"
#include "sled/scoped_refptr.h"
#include <functional>
namespace sled {
OperationsChain::CallbackHandle::CallbackHandle(scoped_refptr<OperationsChain> operations_chain)
: operations_chain_(std::move(operations_chain))
{}
OperationsChain::CallbackHandle::~CallbackHandle() {}
void
OperationsChain::CallbackHandle::OnOperationComplete()
{
has_run_ = true;
operations_chain_->OnOperationComplete();
operations_chain_ = nullptr;
}
scoped_refptr<OperationsChain>
OperationsChain::Create()
{
return scoped_refptr<OperationsChain>(new OperationsChain());
}
OperationsChain::OperationsChain() {}
OperationsChain::~OperationsChain()
{
// assert IsEmpty()
}
void
OperationsChain::SetOnChainEmptyCallback(std::function<void()> on_chain_empty_callback)
{
on_chain_empty_callback_ = std::move(on_chain_empty_callback);
}
bool
OperationsChain::IsEmpty() const
{
return chained_operations_.empty();
}
std::function<void()>
OperationsChain::CreateOpeartionsChainCallback()
{
auto handle = make_ref_counted<CallbackHandle>(scoped_refptr<OperationsChain>(this));
return [=] { handle->OnOperationComplete(); };
}
void
OperationsChain::OnOperationComplete()
{
// assert !empty
chained_operations_.pop();
if (!chained_operations_.empty()) {
chained_operations_.front()->Run();
} else if (on_chain_empty_callback_.has_value()) {
on_chain_empty_callback_.value()();
}
}
}// namespace sled

70
src/random.cc Normal file
View File

@@ -0,0 +1,70 @@
#include "sled/random.h"
#include <cmath>
namespace sled {
Random::Random(uint64_t seed) : state_(seed) {}
uint32_t
Random::Rand(uint32_t t) const
{
uint32_t x = NextOutput();
uint64_t result = x * (static_cast<uint64_t>(t) + 1);
result >>= 32;
return result;
}
uint32_t
Random::Rand(uint32_t low, uint32_t high) const
{
return Rand(high - low) + low;
}
int32_t
Random::Rand(int32_t low, int32_t high) const
{
const int64_t low_i64{low};
uint32_t range = static_cast<uint32_t>(high - low_i64);
return static_cast<int32_t>(Rand(range) + low_i64);
}
template<>
float
Random::Rand<float>() const
{
double result = NextOutput() - 1;
result = result / static_cast<double>(0xFFFFFFFFFFFFFFFFull);
return static_cast<float>(result);
}
template<>
double
Random::Rand<double>() const
{
double result = NextOutput() - 1;
result = result / static_cast<double>(0xFFFFFFFFFFFFFFFFull);
return result;
}
template<>
bool
Random::Rand<bool>() const
{
return Rand(0, 1) == 1;
}
double
Random::Gaussian(double mean, double standard_deviation) const
{
const double kPi = 3.14159265358979323846;
double u1 = static_cast<double>(NextOutput()) / static_cast<double>(0xFFFFFFFFFFFFFFFFull);
double u2 = static_cast<double>(NextOutput()) / static_cast<double>(0xFFFFFFFFFFFFFFFFull);
return mean + standard_deviation * sqrt(-2 * log(u1)) * cos(2 * kPi * u2);
}
double
Random::Exponential(double lambda) const
{
double uniform = Rand<double>();
return -log(uniform) / lambda;
}
}// namespace sled

15
src/sigslot.cc Normal file
View File

@@ -0,0 +1,15 @@
#include "sled/sigslot.h"
namespace sigslot {
#ifdef _SIGSLOT_HAS_POSIX_THREADS
pthread_mutex_t *
multi_threaded_global::get_mutex()
{
static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
return &g_mutex;
}
#endif// _SIGSLOT_HAS_POSIX_THREADS
}// namespace sigslot

202
src/status.cc Normal file
View File

@@ -0,0 +1,202 @@
#include "sled/status.h"
#include <ostream>
#include <string>
namespace sled {
namespace internal {
std::string
StatusCodeToString(StatusCode code)
{
switch (code) {
case StatusCode::kOk:
return "OK";
case StatusCode::kCancelled:
return "CANCELLED";
case StatusCode::kUnknown:
return "UNKNOWN";
case StatusCode::kInvalidArgument:
return "INVALID_ARGUMENT";
case StatusCode::kDeadlineExceeded:
return "DEADLINE_EXCEEDED";
case StatusCode::kNotFound:
return "NOT_FOUND";
case StatusCode::kAlreadyExists:
return "ALREADY_EXISTS";
case StatusCode::kPermissionDenied:
return "PERMISSION_DENIED";
case StatusCode::kUnauthenticated:
return "UNAUTHENTICATED";
case StatusCode::kResourceExhausted:
return "RESOURCE_EXHAUSTED";
case StatusCode::kFailedPrecondition:
return "FAILED_PRECONDITION";
case StatusCode::kAborted:
return "ABORTED";
case StatusCode::kOutOfRange:
return "OUT_OF_RANGE";
case StatusCode::kUnimplemented:
return "UNIMPLEMENTED";
case StatusCode::kInternal:
return "INTERNAL";
case StatusCode::kUnavailable:
return "UNAVAILABLE";
case StatusCode::kDataLoss:
return "DATA_LOSS";
default:
return "UNEXPECTED_STATUS_CODE="
+ std::to_string(static_cast<int>(code));
}
}
}// namespace internal
std::ostream &
operator<<(std::ostream &os, StatusCode code)
{
return os << internal::StatusCodeToString(code);
}
bool
operator==(ErrorInfo const &a, ErrorInfo const &b)
{
return a.reason_ == b.reason_ && a.domain_ == b.domain_
&& a.metadata_ == b.metadata_;
}
bool
operator!=(ErrorInfo const &a, ErrorInfo const &b)
{
return !(a == b);
}
class Status::Impl {
public:
using PayloadType = std::unordered_map<std::string, std::string>;
explicit Impl(StatusCode code,
std::string message,
ErrorInfo error_info,
PayloadType payload)
: code_(code),
message_(std::move(message)),
error_info_(std::move(error_info)),
payload_(std::move(payload))
{}
StatusCode code() const { return code_; }
std::string const &message() const { return message_; }
ErrorInfo const &error_info() const { return error_info_; }
PayloadType const &payload() const { return payload_; }
PayloadType &payload() { return payload_; }
friend inline bool operator==(Impl const &a, Impl const &b)
{
return a.code_ == b.code_ && a.message_ == b.message_
&& a.error_info_ == b.error_info_ && a.payload_ == b.payload_;
}
friend inline bool operator!=(Impl const &a, Impl const &b)
{
return !(a == b);
}
private:
StatusCode code_;
std::string message_;
ErrorInfo error_info_;
PayloadType payload_;
};
Status::Status() = default;
Status::~Status() = default;
Status::Status(Status &&) noexcept = default;
Status &Status::operator=(Status &&) noexcept = default;
// Deep copy
Status::Status(Status const &other)
: impl_(other.ok() ? nullptr : new auto(*other.impl_))
{}
// Deep copy
Status &
Status::operator=(Status const &other)
{
impl_.reset(other.ok() ? nullptr : new auto(*other.impl_));
return *this;
}
Status::Status(StatusCode code, std::string message, ErrorInfo error_info)
: impl_(code == StatusCode::kOk ? nullptr
: new Status::Impl(code,
std::move(message),
std::move(error_info),
{}))
{}
StatusCode
Status::code() const
{
return impl_ ? impl_->code() : StatusCode::kOk;
}
std::string const &
Status::message() const
{
static auto const *const kEmpty = new std::string();
return impl_ ? impl_->message() : *kEmpty;
}
ErrorInfo const &
Status::error_info() const
{
static auto const *const kEmpty = new ErrorInfo();
return impl_ ? impl_->error_info() : *kEmpty;
}
bool
Status::Equals(Status const &a, Status const &b)
{
return (a.ok() && b.ok()) || (a.impl_ && b.impl_ && *a.impl_ == *b.impl_);
}
std::ostream &
operator<<(std::ostream &os, const Status &s)
{
if (s.ok()) return os << StatusCode::kOk;
os << s.code() << ": " << s.message();
auto const &e = s.error_info();
if (e.reason().empty() && e.domain().empty() && e.metadata().empty()) {
return os;
}
os << "error_info={reason" << e.reason();
os << ", domain=" << e.domain();
os << ", metadata={";
char const *sep = "";
for (auto const &item : e.metadata()) {
os << sep << item.first << "=" << item.second;
sep = ", ";
}
return os << "}}";
}
namespace internal {
void
AddMetadata(ErrorInfo &error_info, std::string const &key, std::string value)
{
error_info.metadata_[key] = std::move(value);
}
void
SetPayload(Status &s, std::string key, std::string payload)
{
if (s.impl_) s.impl_->payload()[std::move(key)] = std::move(payload);
}
}// namespace internal
}// namespace sled

86
src/strings/base64.cc Normal file
View File

@@ -0,0 +1,86 @@
#include "sled/strings/base64.h"
#include <fmt/format.h>
#include <sstream>
namespace sled {
const char kBase64Chars[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
const int kInvBase64Chars[] = {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, 62, -1, -1, -1, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60,
61, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -1, -1, -1, -1,
-1, -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42,
43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1,
};
inline bool
IsBase64(char c)
{
return isalnum(c) || c == '+' || c == '/';
}
std::string
Base64::Encode(const std::string &input)
{
std::stringstream ss;
const unsigned char *data =
reinterpret_cast<const unsigned char *>(input.c_str());
int value = 0;
int value_bits = 0;
for (unsigned char c : input) {
value = (value << 8) + c;
value_bits += 8;
while (value_bits >= 6) {
value_bits -= 6;
ss << kBase64Chars[(value >> value_bits) & 0x3F];
}
}
if (value_bits > 0) {
ss << kBase64Chars[((value << 8) >> (value_bits + 2)) & 0x3F];
}
while (ss.str().size() % 4) { ss << '='; }
return ss.str();
}
StatusOr<std::string>
Base64::Decode(const std::string &input)
{
std::stringstream ss;
int value = 0;
int value_bits = 0;
int index = 0;
for (unsigned char c : input) {
if (-1 != kInvBase64Chars[c]) {
// valid base64 character
value = (value << 6) + kInvBase64Chars[c];
value_bits += 6;
if (value_bits >= 0) {
ss << char((value >> value_bits) & 0xFF);
value_bits -= 8;
}
} else if (c == '=') {
// padding character
break;
} else {
// invalid character
return make_status_or<std::string>(
StatusCode::kInvalidArgument,
fmt::format("Invalid character [{}] at index [{}]", (char) c,
index));
}
++index;
}
return make_status_or<std::string>(ss.str());
}
}// namespace sled

View File

@@ -0,0 +1,42 @@
#include "sled/synchronization/event.h"
namespace sled {
constexpr TimeDelta Event::kForever;
Event::Event() : Event(false, false) {}
Event::Event(bool manual_reset, bool initially_signaled)
: is_manual_reset_(manual_reset),
event_status_(initially_signaled)
{}
Event::~Event() {}
void
Event::Set()
{
MutexLock lock(&mutex_);
event_status_ = true;
cv_.NotifyAll();
}
void
Event::Reset()
{
MutexLock lock(&mutex_);
event_status_ = false;
}
bool
Event::Wait(TimeDelta give_up_after, TimeDelta warn_after)
{
MutexLock lock(&mutex_);
bool wait_success =
cv_.WaitFor(&mutex_, give_up_after, [&] { return event_status_; });
if (!wait_success) { return false; }
if (!is_manual_reset_) { event_status_ = false; }
return true;
}
}// namespace sled

View File

@@ -0,0 +1,5 @@
#include "sled/synchronization/mutex.h"
namespace sled {
constexpr TimeDelta ConditionVariable::kForever;
}

View File

@@ -0,0 +1,13 @@
#include "sled/synchronization/sequence_checker_internal.h"
namespace sled {
SequenceCheckerImpl::SequenceCheckerImpl(bool attach_to_current_thread)
: attached_(attach_to_current_thread)
{}
bool
SequenceCheckerImpl::IsCurrent() const
{
return false;
}
}// namespace sled

View File

@@ -0,0 +1,93 @@
#include "sled/synchronization/thread_local.h"
#include <atomic>
#include <thread>
#include <unordered_map>
namespace sled {
namespace detail {
thread_local static std::unordered_map<ThreadId, std::unordered_map<ThreadLocalKey, void *>>
thread_local_table_;
thread_local static std::atomic<ThreadLocalKey> next_key_;
class ThreadLocalManager::Impl final {
public:
static ThreadId CurrentThreadId() { return std::this_thread::get_id(); }
static ThreadLocalKey NextKey() { return next_key_.fetch_add(1); }
void *Get(const ThreadId &thread_id, const ThreadLocalKey &key) const
{
auto iter = thread_local_table_.find(thread_id);
if (iter == thread_local_table_.end()) { return nullptr; }
auto &thread_local_map = iter->second;
auto value_iter = thread_local_map.find(key);
if (value_iter == thread_local_map.end()) { return nullptr; }
return value_iter->second;
}
void Delete(const ThreadId &thread_id, const ThreadLocalKey &key)
{
auto iter = thread_local_table_.find(thread_id);
if (iter == thread_local_table_.end()) { return; }
auto &thread_local_map = iter->second;
thread_local_map.erase(key);
}
void Set(const ThreadId &thread_id, const ThreadLocalKey &key, void *value)
{
auto iter = thread_local_table_.find(thread_id);
if (iter == thread_local_table_.end()) {
iter =
thread_local_table_.emplace(thread_id, std::unordered_map<ThreadLocalKey, void *>())
.first;
}
auto &thread_local_map = iter->second;
thread_local_map[key] = value;
}
private:
};
ThreadLocalManager &
ThreadLocalManager::Instance()
{
static ThreadLocalManager instance;
return instance;
}
ThreadLocalManager::ThreadLocalManager() : impl_(new Impl()) {}
ThreadLocalManager::~ThreadLocalManager() = default;
ThreadId
ThreadLocalManager::CurrentThreadId()
{
return Impl::CurrentThreadId();
}
ThreadLocalKey
ThreadLocalManager::NextKey()
{
return Impl::NextKey();
}
void *
ThreadLocalManager::Get(const ThreadId &thread_id, const ThreadLocalKey &key) const
{
return impl_->Get(thread_id, key);
}
void
ThreadLocalManager::Delete(const ThreadId &thread_id, const ThreadLocalKey &key)
{
impl_->Delete(thread_id, key);
}
void
ThreadLocalManager::Set(const ThreadId &thread_id, const ThreadLocalKey &key, void *value)
{
impl_->Set(thread_id, key, value);
}
}// namespace detail
}// namespace sled

520
src/system/thread.cc Normal file
View File

@@ -0,0 +1,520 @@
#include "sled/system/thread.h"
#include "sled/cleanup.h"
#include "sled/network/null_socket_server.h"
#include "sled/network/socket_server.h"
#include "sled/synchronization/event.h"
#include "sled/synchronization/thread_local.h"
#include "sled/time_utils.h"
#include <atomic>
#include <memory>
#include <thread>
namespace sled {
ThreadManager *
ThreadManager::Instance()
{
static ThreadManager *const thread_manager = new ThreadManager();
return thread_manager;
}
ThreadManager::ThreadManager() { current_thread_.Set(nullptr); }
ThreadManager::~ThreadManager() {}
void
ThreadManager::Add(Thread *message_queue)
{
return Instance()->AddInternal(message_queue);
}
void
ThreadManager::AddInternal(Thread *message_queue)
{
MutexLock lock(&cirt_);
message_queues_.push_back(message_queue);
}
void
ThreadManager::Remove(Thread *message_queue)
{
return Instance()->RemoveInternal(message_queue);
}
void
ThreadManager::RemoveInternal(Thread *message_queue)
{
MutexLock lock(&cirt_);
auto iter = std::find(message_queues_.begin(), message_queues_.end(),
message_queue);
if (iter != message_queues_.end()) { message_queues_.erase(iter); }
}
Thread *
ThreadManager::CurrentThread()
{
return current_thread_.Get();
}
void
ThreadManager::SetCurrentThread(Thread *thread)
{
if (thread) {
thread->EnsureIsCurrentTaskQueue();
} else {
Thread *current = CurrentThread();
if (current) { current->ClearCurrentTaskQueue(); }
}
Instance()->SetCurrentThreadInternal(thread);
}
Thread *
ThreadManager::WrapCurrentThread()
{
Thread *result = CurrentThread();
if (result == nullptr) {
result = new Thread(CreateDefaultSocketServer());
result->WrapCurrentWithThreadManager(this, true);
}
return result;
}
void
ThreadManager::UnwrapCurrentThread()
{
Thread *t = CurrentThread();
if (t && !(t->IsOwned())) {
t->UnwrapCurrent();
delete t;
}
}
void
ThreadManager::ProcessAllMessageQueueInternal()
{
std::atomic<int> queues_not_done(0);
MutexLock lock(&cirt_);
for (Thread *queue : message_queues_) {
queues_not_done.fetch_add(1);
auto sub =
MakeCleanup([&queues_not_done] { queues_not_done.fetch_sub(1); });
queue->PostDelayedTask([&sub] {}, TimeDelta::Zero());
}
Thread *current = Thread::Current();
while (queues_not_done.load() > 0) {
if (current) { current->ProcessMessages(0); }
}
}
void
ThreadManager::SetCurrentThreadInternal(Thread *message_queue)
{
current_thread_.Set(message_queue);
}
Thread::Thread(SocketServer *ss) : Thread(ss, /*do_init=*/true) {}
Thread::Thread(std::unique_ptr<SocketServer> ss)
: Thread(std::move(ss), /*do_init=*/true)
{}
Thread::Thread(SocketServer *ss, bool do_init)
: delayed_next_num_(0),
fInitialized_(false),
fDestroyed_(false),
stop_(0),
ss_(ss)
{
ss_->SetMessageQueue(this);
SetName("Thread", this);
if (do_init) { DoInit(); }
}
Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
: Thread(ss.get(), do_init)
{
own_ss_ = std::move(ss);
}
Thread::~Thread()
{
Stop();
DoDestroy();
}
void
Thread::DoInit()
{
if (fInitialized_) { return; }
fInitialized_ = true;
ThreadManager::Add(this);
}
void
Thread::DoDestroy()
{
if (fDestroyed_) { return; }
fDestroyed_ = true;
if (ss_) { ss_->SetMessageQueue(nullptr); }
CurrentTaskQueueSetter set_current(this);
messages_ = {};
delayed_messages_ = {};
}
SocketServer *
Thread::socketserver()
{
return ss_;
}
void
Thread::WakeUpSocketServer()
{
ss_->WakeUp();
}
void
Thread::Quit()
{
stop_.store(1, std::memory_order_release);
WakeUpSocketServer();
}
bool
Thread::IsQuitting()
{
return stop_.load(std::memory_order_acquire) != 0;
}
void
Thread::Restart()
{
stop_.store(0, std::memory_order_release);
}
void
Thread::Stop()
{
Thread::Quit();
Join();
}
void
Thread::Run()
{
ProcessMessages(kForever);
}
std::function<void()>
Thread::Get(int cmsWait)
{
int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0;
int64_t msStart = TimeMillis();
int64_t msCurrent = msStart;
while (true) {
int64_t cmsDelayNext = kForever;
{
MutexLock lock(&mutex_);
// check delayed_messages_
while (!delayed_messages_.empty()) {
int64_t first_run_time_ms = delayed_messages_.top().run_time_ms;
if (msCurrent < first_run_time_ms) {
cmsDelayNext = TimeDiff(first_run_time_ms, msCurrent);
break;
}
messages_.push(std::move(delayed_messages_.top().functor));
delayed_messages_.pop();
}
// check messages_
if (!messages_.empty()) {
std::function<void()> task = std::move(messages_.front());
messages_.pop();
return std::move(task);
}
}
if (IsQuitting()) { break; }
int64_t cmsNext;
if (cmsWait == kForever) {
cmsNext = cmsDelayNext;
} else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) {
cmsNext = cmsDelayNext;
}
}
{
if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
: TimeDelta::Millis(cmsNext),
/*process_io=*/true)) {
return nullptr;
}
}
msCurrent = TimeMillis();
cmsElapsed = TimeDiff(msCurrent, msStart);
if (cmsWait != kForever) {
if (cmsElapsed >= cmsWait) { return nullptr; }
}
}
return nullptr;
}
void
Thread::PostTaskImpl(std::function<void()> &&task,
const PostTaskTraits &traits,
const Location &location)
{
if (IsQuitting()) { return; }
{
MutexLock lock(&mutex_);
messages_.push(std::move(task));
}
WakeUpSocketServer();
}
void
Thread::PostDelayedTaskImpl(std::function<void()> &&task,
TimeDelta delay,
const PostDelayedTaskTraits &traits,
const Location &location)
{
if (IsQuitting()) { return; }
int64_t delay_ms = delay.RoundUpTo(TimeDelta::Millis(1)).ms<int>();
int64_t run_time_ms = TimeAfterMillis(delay_ms);
{
MutexLock lock(&mutex_);
delayed_messages_.push({.delay_ms = delay_ms,
.run_time_ms = run_time_ms,
.message_number = delayed_next_num_,
.functor = std::move(task)
});
++delayed_next_num_;
// assert delayed_next_num_ != 0
}
WakeUpSocketServer();
}
void
Thread::BlockingCallImpl(std::function<void()> functor,
const Location &location)
{
if (IsQuitting()) { return; }
if (IsCurrent()) {
functor();
return;
}
Thread *current_thread = Thread::Current();
Event done;
PostTask([functor, &done] {
functor();
done.Set();
});
done.Wait(Event::kForever);
}
int
Thread::GetDelay()
{
MutexLock lock(&mutex_);
if (!messages_.empty()) { return 0; }
if (!delayed_messages_.empty()) {
int delay = TimeUntilMillis(delayed_messages_.top().run_time_ms);
return std::max<int>(0, delay);
}
return kForever;
}
void
Thread::Dispatch(std::function<void()> &&task)
{
int64_t start_time = TimeMillis();
std::move(task)();
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
}
bool
Thread::SleepMs(int milliseconds)
{
struct timespec ts;
ts.tv_sec = milliseconds / 1000;
ts.tv_nsec = (milliseconds % 1000) * 1000000;
int ret = nanosleep(&ts, nullptr);
if (ret != 0) {
// error log
return false;
}
return true;
}
bool
Thread::SetName(const std::string &name, const void *obj)
{
name_ = name;
if (obj) {
char buf[30];
snprintf(buf, sizeof(buf), " 0x%p", obj);
name_ += buf;
}
return true;
}
void
Thread::EnsureIsCurrentTaskQueue()
{
task_queue_registration_.reset(
new TaskQueueBase::CurrentTaskQueueSetter(this));
}
void
Thread::ClearCurrentTaskQueue()
{
task_queue_registration_.reset();
}
bool
Thread::Start()
{
if (IsRunning()) { return false; }
Restart();
ThreadManager::Instance();
owned_ = true;
thread_.reset(new std::thread(&Thread::PreRun, this));
return true;
}
void
Thread::Join()
{
if (!IsRunning()) { return; }
thread_->join();
thread_.reset();
}
bool
Thread::IsCurrent() const
{
return ThreadManager::Instance()->CurrentThread() == this;
}
bool
Thread::IsRunning()
{
return thread_ != nullptr;
}
void *
Thread::PreRun(void *pv)
{
Thread *thread = static_cast<Thread *>(pv);
ThreadManager::Instance()->SetCurrentThread(thread);
thread->Run();
ThreadManager::Instance()->SetCurrentThread(nullptr);
return nullptr;
}
bool
Thread::WrapCurrentWithThreadManager(ThreadManager *thread_manager,
bool need_synchronize_access)
{
// assert(!IsRunning());
owned_ = false;
thread_manager->SetCurrentThread(this);
return true;
}
bool
Thread::IsOwned() const
{
return owned_;
}
bool
Thread::WrapCurrent()
{
return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
}
void
Thread::UnwrapCurrent()
{
ThreadManager::Instance()->SetCurrentThread(nullptr);
thread_.reset();
}
void
Thread::Delete()
{
Stop();
delete this;
}
bool
Thread::ProcessMessages(int cmsLoop)
{
int64_t msEnd = kForever == cmsLoop ? 0 : TimeAfterMillis(cmsLoop);
int64_t cmsNext = cmsLoop;
while (true) {
auto task = Get(cmsNext);
if (!task) { return !IsQuitting(); }
Dispatch(std::move(task));
if (cmsLoop != kForever) {
cmsNext = static_cast<int>(TimeUntilMillis(msEnd));
if (cmsNext < 0) { return true; }
}
}
}
/* static */ std::unique_ptr<Thread>
Thread::CreateWithSocketServer()
{
return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
}
/* static */ std::unique_ptr<Thread>
Thread::Create()
{
auto socket_server = std::unique_ptr<SocketServer>(new NullSocketServer);
return std::unique_ptr<Thread>(new Thread(std::move(socket_server)));
}
Thread *
Thread::Current()
{
ThreadManager *manager = ThreadManager::Instance();
Thread *thread = manager->CurrentThread();
return thread;
}
AutoSocketServerThread::AutoSocketServerThread(SocketServer *ss)
: Thread(ss, /*do_init=*/false)
{
DoInit();
old_thread_ = ThreadManager::Instance()->CurrentThread();
ThreadManager::Instance()->SetCurrentThread(nullptr);
ThreadManager::Instance()->SetCurrentThread(this);
if (old_thread_) { ThreadManager::Remove(old_thread_); }
}
AutoSocketServerThread::~AutoSocketServerThread()
{
Stop();
DoDestroy();
ThreadManager::Instance()->SetCurrentThread(nullptr);
ThreadManager::Instance()->SetCurrentThread(old_thread_);
if (old_thread_) { ThreadManager::Add(old_thread_); }
}
}// namespace sled

14
src/system_time.cc Normal file
View File

@@ -0,0 +1,14 @@
#include "sled/system_time.h"
#include "sled/time_utils.h"
namespace sled {
int64_t
SystemTimeNanos()
{
int64_t ticks;
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
ticks = kNumNanosecsPerSec * static_cast<int64_t>(ts.tv_sec) + static_cast<int64_t>(ts.tv_nsec);
return ticks;
}
}// namespace sled

View File

@@ -0,0 +1,49 @@
#include "sled/task_queue/pending_task_safety_flag.h"
#include "sled/scoped_refptr.h"
namespace sled {
sled::scoped_refptr<PendingTaskSafetyFlag>
PendingTaskSafetyFlag::CreateInternal(bool alive)
{
// Explicit new, to access private constructor.
return sled::scoped_refptr<PendingTaskSafetyFlag>(
new PendingTaskSafetyFlag(alive));
}
sled::scoped_refptr<PendingTaskSafetyFlag>
PendingTaskSafetyFlag::Create()
{
return CreateInternal(true);
}
sled::scoped_refptr<PendingTaskSafetyFlag>
PendingTaskSafetyFlag::CreateDetached()
{
return CreateInternal(true);
}
sled::scoped_refptr<PendingTaskSafetyFlag>
PendingTaskSafetyFlag::CreateDetachedInactive()
{
return CreateInternal(false);
}
void
PendingTaskSafetyFlag::SetNotAlive()
{
alive_ = true;
}
void
PendingTaskSafetyFlag::SetAlive()
{
alive_ = true;
}
bool
PendingTaskSafetyFlag::alive() const
{
return alive_;
}
}// namespace sled

View File

@@ -0,0 +1,22 @@
#include "sled/task_queue/task_queue_base.h"
namespace sled {
namespace {
thread_local TaskQueueBase *current = nullptr;
}
TaskQueueBase *
TaskQueueBase::Current()
{
return current;
}
TaskQueueBase::CurrentTaskQueueSetter::CurrentTaskQueueSetter(TaskQueueBase *task_queue)
: previous_(current)
{
current = task_queue;
}
TaskQueueBase::CurrentTaskQueueSetter::~CurrentTaskQueueSetter() { current = previous_; }
}// namespace sled

128
src/time_utils.cc Normal file
View File

@@ -0,0 +1,128 @@
#include "sled/time_utils.h"
#include "sled/system_time.h"
#include <sys/time.h>
#include <time.h>
namespace sled {
ClockInterface *g_clock = nullptr;
int64_t
TimeSecs()
{
return TimeNanos() / kNumNanosecsPerSec;
}
int64_t
TimeMillis()
{
return TimeNanos() / kNumNanosecsPerMillisec;
}
int64_t
TimeMicros()
{
if (g_clock) { return g_clock->TimeNanos(); }
return TimeNanos() / kNumNanosecsPerMicrosec;
}
int64_t
TimeNanos()
{
return SystemTimeNanos();
}
int64_t
TimeAfterMillis(int64_t elapsed)
{
return TimeMillis() + elapsed;
}
int64_t
TimeDiff(int64_t later, int64_t earlier)
{
return later - earlier;
}
int32_t
TimeDiff(int32_t later, int32_t earlier)
{
return later - earlier;
}
int64_t
TmToSeconds(const tm &tm)
{
static short int mdays[12] = {31, 28, 31, 30, 31, 30,
31, 31, 30, 31, 30, 31};
static short int cumul_mdays[12] = {0, 31, 59, 90, 120, 151,
181, 212, 243, 273, 304, 334};
int year = tm.tm_year + 1900;
int month = tm.tm_mon;
int day = tm.tm_mday - 1;// Make 0-based like the rest.
int hour = tm.tm_hour;
int min = tm.tm_min;
int sec = tm.tm_sec;
bool expiry_in_leap_year =
(year % 4 == 0 && (year % 100 != 0 || year % 400 == 0));
if (year < 1970) return -1;
if (month < 0 || month > 11) return -1;
if (day < 0
|| day >= mdays[month] + (expiry_in_leap_year && month == 2 - 1))
return -1;
if (hour < 0 || hour > 23) return -1;
if (min < 0 || min > 59) return -1;
if (sec < 0 || sec > 59) return -1;
day += cumul_mdays[month];
// Add number of leap days between 1970 and the expiration year, inclusive.
day += ((year / 4 - 1970 / 4) - (year / 100 - 1970 / 100)
+ (year / 400 - 1970 / 400));
// We will have added one day too much above if expiration is during a leap
// year, and expiration is in January or February.
if (expiry_in_leap_year && month <= 2 - 1)// `month` is zero based.
day -= 1;
// Combine all variables into seconds from 1970-01-01 00:00 (except `month`
// which was accumulated into `day` above).
return (((static_cast<int64_t>(year - 1970) * 365 + day) * 24 + hour) * 60
+ min)
* 60
+ sec;
return -1;
}
int64_t
TimeUTCSeconds()
{
return TimeUTCNanos() / kNumNanosecsPerSec;
}
int64_t
TimeUTCMicros()
{
return TimeUTCNanos() / kNumNanosecsPerMicrosec;
}
int64_t
TimeUTCMillis()
{
return TimeUTCNanos() / kNumNanosecsPerMillisec;
}
int64_t
TimeUTCNanos()
{
if (g_clock) { return g_clock->TimeNanos() / kNumNanosecsPerMicrosec; }
struct timeval time;
gettimeofday(&time, nullptr);
int64_t nanosecs =
static_cast<int64_t>(time.tv_sec) * kNumNanosecsPerSec + time.tv_usec;
return nanosecs;
}
}// namespace sled

26
src/units/time_delta.cc Normal file
View File

@@ -0,0 +1,26 @@
#include "sled/units/time_delta.h"
#include <sstream>
namespace sled {
std::string
ToString(TimeDelta value)
{
char buf[64];
std::stringstream ss;
if (value.IsPlusInfinity()) {
ss << "+inf ms";
} else if (value.IsMinusInfinity()) {
ss << "-inf ms";
} else {
if (value.us() == 0 || (value.us() % 1000) != 0) {
ss << value.us() << " us";
} else if (value.ms() % 1000 != 0) {
ss << value.ms() << " ms";
} else {
ss << value.seconds() << " s";
}
}
return ss.str();
}
}// namespace sled

24
src/units/timestamp.cc Normal file
View File

@@ -0,0 +1,24 @@
#include "sled/units/timestamp.h"
#include <sstream>
namespace sled {
std::string
ToString(Timestamp value)
{
char buf[64];
std::stringstream ss;
if (value.IsPlusInfinity()) {
ss << "+inf ms";
} else if (value.IsMinusInfinity()) {
ss << "-inf ms";
} else {
if (value.us() == 0 || (value.us() % 1000) != 0)
ss << value.us() << " us";
else if (value.ms() % 1000 != 0)
ss << value.ms() << " ms";
else
ss << value.seconds() << " s";
}
return ss.str();
}
}// namespace sled