feat update tsa

This commit is contained in:
tqcq
2024-03-23 08:53:11 +08:00
parent a81eb0f8de
commit 03a62901f2
29 changed files with 338 additions and 315 deletions

View File

@@ -1,5 +1,4 @@
#include "sled/network/async_resolver.h"
#include "sled/network/async_resolver_interface.h"
#include "sled/ref_counted_base.h"
#include "sled/synchronization/mutex.h"
#include "sled/task_queue/task_queue_base.h"
@@ -15,9 +14,7 @@ struct AsyncResolver::State : public RefCountedBase {
};
int
ResolveHostname(const std::string &hostname,
int family,
std::vector<IPAddress> *addresses)
ResolveHostname(const std::string &hostname, int family, std::vector<IPAddress> *addresses)
{
if (!addresses) { return -1; }

View File

@@ -85,10 +85,7 @@ private:
bool &flag_to_clear_;
};
PhysicalSocketServer::PhysicalSocketServer() : fWait_(false)
{
signal_wakeup_ = new Signaler(this, fWait_);
}
PhysicalSocketServer::PhysicalSocketServer() : fWait_(false) { signal_wakeup_ = new Signaler(this, fWait_); }
PhysicalSocketServer::~PhysicalSocketServer() { delete signal_wakeup_; }
@@ -158,9 +155,7 @@ PhysicalSocketServer::Update(Dispatcher *pdispatcher)
int
PhysicalSocketServer::ToCusWait(TimeDelta max_wait_duration)
{
return max_wait_duration == Event::kForever
? kForeverMs
: max_wait_duration.RoundUpTo(TimeDelta::Micros(1)).us();
return max_wait_duration == Event::kForever ? kForeverMs : max_wait_duration.RoundUpTo(TimeDelta::Micros(1)).us();
}
bool
@@ -174,17 +169,12 @@ PhysicalSocketServer::Wait(TimeDelta max_wait_duration, bool process_io)
}
static void
ProcessEvents(Dispatcher *pdispatcher,
bool readable,
bool writable,
bool error_event,
bool check_error)
ProcessEvents(Dispatcher *pdispatcher, bool readable, bool writable, bool error_event, bool check_error)
{
int errcode = 0;
if (check_error) {
socklen_t len = sizeof(errcode);
int res = ::getsockopt(pdispatcher->GetDescriptor(), SOL_SOCKET,
SO_ERROR, &errcode, &len);
int res = ::getsockopt(pdispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode, &len);
if (res < 0) {
if (error_event || errno != ENOTSOCK) { errcode = EBADF; }
}
@@ -243,9 +233,7 @@ PhysicalSocketServer::WaitSelect(int64_t cusWait, bool process_io)
for (auto const &kv : dispatcher_by_key_) {
uint64_t key = kv.first;
Dispatcher *pdispatcher = kv.second;
if (!process_io && (pdispatcher != signal_wakeup_)) {
continue;
}
if (!process_io && (pdispatcher != signal_wakeup_)) { continue; }
current_dispatcher_keys_.push_back(key);
int fd = pdispatcher->GetDescriptor();
if (fd > fdmax) { fdmax = fd; }
@@ -278,8 +266,7 @@ PhysicalSocketServer::WaitSelect(int64_t cusWait, bool process_io)
bool writable = FD_ISSET(fd, &fdsWrite);
if (writable) { FD_CLR(fd, &fdsWrite); }
ProcessEvents(pdispatcher, readable, writable, false,
readable || writable);
ProcessEvents(pdispatcher, readable, writable, false, readable || writable);
}
}
@@ -309,8 +296,7 @@ PhysicalSocket::PhysicalSocket(PhysicalSocketServer *ss, SOCKET s)
SetEnabledEvents(DE_READ | DE_WRITE);
int type = SOCK_STREAM;
socklen_t len = sizeof(type);
const int res =
getsockopt(s_, SOL_SOCKET, SO_TYPE, (void *) &type, &len);
const int res = getsockopt(s_, SOL_SOCKET, SO_TYPE, (void *) &type, &len);
udp_ = (SOCK_DGRAM == type);
}
}
@@ -399,8 +385,7 @@ void
PhysicalSocket::SetError(int error)
{
// MutexLock lock(&mutex_);
MutexGuard lock(&mutex_);
MutexLock lock(&mutex_);
error_ = error;
}
@@ -431,31 +416,24 @@ PhysicalSocket::SetOption(Option opt, int value)
int
PhysicalSocket::Send(const void *pv, size_t cb)
{
int sent = DoSend(s_, reinterpret_cast<const char *>(pv),
static_cast<int>(cb), MSG_NOSIGNAL);
int sent = DoSend(s_, reinterpret_cast<const char *>(pv), static_cast<int>(cb), MSG_NOSIGNAL);
UpdateLastError();
if ((sent > 0 && sent < static_cast<int>(cb))
|| (sent < 0 && IsBlockingError(GetError()))) {
if ((sent > 0 && sent < static_cast<int>(cb)) || (sent < 0 && IsBlockingError(GetError()))) {
EnableEvents(DE_WRITE);
}
return sent;
}
int
PhysicalSocket::SendTo(const void *buffer,
size_t length,
const SocketAddress &addr)
PhysicalSocket::SendTo(const void *buffer, size_t length, const SocketAddress &addr)
{
sockaddr_storage saddr;
size_t len = addr.ToSockAddrStorage(&saddr);
int sent =
DoSendTo(s_, static_cast<const char *>(buffer),
static_cast<int>(length), MSG_NOSIGNAL,
reinterpret_cast<sockaddr *>(&saddr), static_cast<int>(len));
int sent = DoSendTo(s_, static_cast<const char *>(buffer), static_cast<int>(length), MSG_NOSIGNAL,
reinterpret_cast<sockaddr *>(&saddr), static_cast<int>(len));
UpdateLastError();
if ((sent > 0 && sent < static_cast<int>(length))
|| (sent < 0 && IsBlockingError(GetError()))) {
if ((sent > 0 && sent < static_cast<int>(length)) || (sent < 0 && IsBlockingError(GetError()))) {
EnableEvents(DE_WRITE);
}
return sent;
@@ -483,10 +461,7 @@ PhysicalSocket::Recv(void *buffer, size_t length, int64_t *timestamp)
}
int
PhysicalSocket::RecvFrom(void *buffer,
size_t length,
SocketAddress *out_addr,
int64_t *timestamp)
PhysicalSocket::RecvFrom(void *buffer, size_t length, SocketAddress *out_addr, int64_t *timestamp)
{
int received = DoReadFromSocket(buffer, length, out_addr, timestamp);
UpdateLastError();
@@ -516,17 +491,12 @@ GetSocketRecvTimestamp(int socket)
if (ret != 0) { return -1; }
int64_t timestamp =
static_cast<int64_t>(tv_ioctl.tv_sec) * kNumMicrosecsPerSec
+ tv_ioctl.tv_usec;
int64_t timestamp = static_cast<int64_t>(tv_ioctl.tv_sec) * kNumMicrosecsPerSec + tv_ioctl.tv_usec;
return timestamp;
}
int
PhysicalSocket::DoReadFromSocket(void *buffer,
size_t length,
SocketAddress *out_addr,
int64_t *timestamp)
PhysicalSocket::DoReadFromSocket(void *buffer, size_t length, SocketAddress *out_addr, int64_t *timestamp)
{
sockaddr_storage addr_storage;
socklen_t addr_len = sizeof(addr_storage);
@@ -534,12 +504,10 @@ PhysicalSocket::DoReadFromSocket(void *buffer,
int received = 0;
if (out_addr) {
received = ::recvfrom(s_, static_cast<char *>(buffer),
static_cast<int>(length), 0, addr, &addr_len);
received = ::recvfrom(s_, static_cast<char *>(buffer), static_cast<int>(length), 0, addr, &addr_len);
SocketAddressFromSockAddrStorage(addr_storage, out_addr);
} else {
received = ::recv(s_, static_cast<char *>(buffer),
static_cast<int>(length), 0);
received = ::recv(s_, static_cast<char *>(buffer), static_cast<int>(length), 0);
}
if (timestamp) { *timestamp = GetSocketRecvTimestamp(s_); }
@@ -592,10 +560,7 @@ PhysicalSocket::Close()
}
SOCKET
PhysicalSocket::DoAccept(SOCKET socket, sockaddr *addr, socklen_t *addrlen)
{
return ::accept(socket, addr, addrlen);
}
PhysicalSocket::DoAccept(SOCKET socket, sockaddr *addr, socklen_t *addrlen) { return ::accept(socket, addr, addrlen); }
int
PhysicalSocket::DoSend(SOCKET socket, const char *buf, int len, int flags)
@@ -617,9 +582,7 @@ PhysicalSocket::DoSendTo(SOCKET socket,
int
PhysicalSocket::DoConnect(const SocketAddress &connect_addr)
{
if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) {
return SOCKET_ERROR;
}
if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) { return SOCKET_ERROR; }
sockaddr_storage addr_storage;
size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
@@ -702,13 +665,9 @@ PhysicalSocket::TranslateOption(Option opt, int *slevel, int *sopt)
return 0;
}
SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss)
: PhysicalSocket(ss)
{}
SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {}
SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss)
: PhysicalSocket(ss, s)
{}
SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {}
SocketDispatcher::~SocketDispatcher() { Close(); }

View File

@@ -0,0 +1,56 @@
#include "sled/synchronization/sequence_checker.h"
namespace sled {
SequenceChecker::SequenceChecker(InitialState initial_state)
: attached_(initial_state),
valid_thread_(pthread_self()),
valid_queue_(TaskQueueBase::Current())
{}
bool
SequenceChecker::IsCurrent() const
{
const TaskQueueBase *const current_queue = TaskQueueBase::Current();
const pthread_t current_thread = pthread_self();
MutexLock lock(&mutex_);
if (!attached_) {
attached_ = true;
valid_thread_ = current_thread;
valid_queue_ = current_queue;
return true;
}
if (valid_queue_) { return valid_queue_ == current_queue; }
return pthread_equal(valid_thread_, current_thread);
}
void
SequenceChecker::Detach()
{
MutexLock lock(&mutex_);
attached_ = false;
}
std::string
SequenceChecker::ExpectationToString() const
{
const TaskQueueBase *const current_queue = TaskQueueBase::Current();
const pthread_t current_thread = pthread_self();
MutexLock lock(&mutex_);
if (!attached_) { return "Checker currently not attached."; }
std::stringstream ss;
ss << "# Expected: TQ: " << valid_queue_ << " Thread: " << reinterpret_cast<const void *>(valid_thread_)
<< std::endl;
ss << "# Current: TQ: " << current_queue << " Thread: " << reinterpret_cast<const void *>(current_thread)
<< std::endl;
if ((valid_queue_ || current_queue) && valid_queue_ != current_queue) {
ss << "# Mismatched TaskQueue" << std::endl;
} else if (valid_thread_ != current_thread) {
ss << "# Mismatched Thread" << std::endl;
}
return ss.str();
}
}// namespace sled

View File

@@ -1,13 +0,0 @@
#include "sled/synchronization/sequence_checker_internal.h"
namespace sled {
SequenceCheckerImpl::SequenceCheckerImpl(bool attach_to_current_thread)
: attached_(attach_to_current_thread)
{}
bool
SequenceCheckerImpl::IsCurrent() const
{
return false;
}
}// namespace sled

View File

@@ -0,0 +1,68 @@
#include <gtest/gtest.h>
#include <sled/log/log.h>
#include <sled/synchronization/event.h>
#include <sled/synchronization/sequence_checker.h>
#include <sled/system/thread.h>
void
RunOnDifferentThread(std::function<void()> func)
{
sled::Event thread_has_run_event;
std::thread thread([&] {
func();
thread_has_run_event.Set();
});
thread.detach();
EXPECT_TRUE(thread_has_run_event.Wait(sled::TimeDelta::Seconds(1)));
}
TEST(SequenceChecker, CallsAllowedOnSameThread)
{
sled::SequenceChecker sequence_checker;
EXPECT_TRUE(sequence_checker.IsCurrent());
}
TEST(SequenceChecker, DestructorAllowedOnDifferentThread)
{
auto sequence_checker = std::make_unique<sled::SequenceChecker>();
RunOnDifferentThread([&] { sequence_checker.reset(); });
}
TEST(SequenceChecker, Detach)
{
sled::SequenceChecker sequence_checker;
sequence_checker.Detach();
RunOnDifferentThread([&] { EXPECT_TRUE(sequence_checker.IsCurrent()); });
}
TEST(SequenceChecker, OnlyCurrentOnOneThread)
{
sled::SequenceChecker sequence_checker(sled::SequenceChecker::kDetached);
RunOnDifferentThread([&] {
EXPECT_TRUE(sequence_checker.IsCurrent());
RunOnDifferentThread([&] { EXPECT_FALSE(sequence_checker.IsCurrent()); });
EXPECT_TRUE(sequence_checker.IsCurrent());
});
}
TEST(SequenceChecker, DeatchFromThreadAndUseOnTaskQueue)
{
auto queue = sled::Thread::Create();
ASSERT_TRUE(queue->Start());
sled::SequenceChecker sequence_checker;
sequence_checker.Detach();
queue->BlockingCall([&] { EXPECT_TRUE(sequence_checker.IsCurrent()); });
}
TEST(SequenceChecker, DetachFromTaskQueueAndUseOnThread)
{
auto queue = sled::Thread::Create();
ASSERT_TRUE(queue->Start());
queue->BlockingCall([&] {
sled::SequenceChecker sequence_checker;
sequence_checker.Detach();
RunOnDifferentThread([&] { EXPECT_TRUE(sequence_checker.IsCurrent()); });
});
queue->Stop();
}