rpc : send hash when tensor data is above some fixed threshold (#12496)

* rpc : send hash when tensor data is above some fixed threshold

ref #10095

* rpc : put cache under $HOME/.cache/llama.cpp

* try to fix win32 build

* another try to fix win32 build

* remove llama as dependency
This commit is contained in:
Radoslav Gerganov
2025-03-28 08:18:04 +02:00
committed by GitHub
parent 2099a9d5db
commit ab6ab8f809
4 changed files with 290 additions and 16 deletions

View File

@ -1,2 +1,4 @@
add_executable(rpc-server rpc-server.cpp) set(TARGET rpc-server)
target_link_libraries(rpc-server PRIVATE ggml llama) add_executable(${TARGET} rpc-server.cpp)
target_link_libraries(${TARGET} PRIVATE ggml)
target_compile_features(${TARGET} PRIVATE cxx_std_17)

View File

@ -1,3 +1,7 @@
#if defined(_MSC_VER)
#define _SILENCE_CXX17_CODECVT_HEADER_DEPRECATION_WARNING
#endif
#include "ggml-cpu.h" #include "ggml-cpu.h"
#ifdef GGML_USE_CUDA #ifdef GGML_USE_CUDA
@ -18,17 +22,132 @@
#include "ggml-rpc.h" #include "ggml-rpc.h"
#ifdef _WIN32 #ifdef _WIN32
# define DIRECTORY_SEPARATOR '\\'
# include <locale>
# include <windows.h> # include <windows.h>
# include <fcntl.h>
# include <io.h>
#else #else
# define DIRECTORY_SEPARATOR '/'
# include <unistd.h> # include <unistd.h>
# include <sys/stat.h>
#endif #endif
#include <codecvt>
#include <string> #include <string>
#include <stdio.h> #include <stdio.h>
#include <vector>
#include <filesystem>
namespace fs = std::filesystem;
// NOTE: this is copied from common.cpp to avoid linking with libcommon
// returns true if successful, false otherwise
static bool fs_create_directory_with_parents(const std::string & path) {
#ifdef _WIN32
std::wstring_convert<std::codecvt_utf8<wchar_t>> converter;
std::wstring wpath = converter.from_bytes(path);
// if the path already exists, check whether it's a directory
const DWORD attributes = GetFileAttributesW(wpath.c_str());
if ((attributes != INVALID_FILE_ATTRIBUTES) && (attributes & FILE_ATTRIBUTE_DIRECTORY)) {
return true;
}
size_t pos_slash = 0;
// process path from front to back, procedurally creating directories
while ((pos_slash = path.find('\\', pos_slash)) != std::string::npos) {
const std::wstring subpath = wpath.substr(0, pos_slash);
const wchar_t * test = subpath.c_str();
const bool success = CreateDirectoryW(test, NULL);
if (!success) {
const DWORD error = GetLastError();
// if the path already exists, ensure that it's a directory
if (error == ERROR_ALREADY_EXISTS) {
const DWORD attributes = GetFileAttributesW(subpath.c_str());
if (attributes == INVALID_FILE_ATTRIBUTES || !(attributes & FILE_ATTRIBUTE_DIRECTORY)) {
return false;
}
} else {
return false;
}
}
pos_slash += 1;
}
return true;
#else
// if the path already exists, check whether it's a directory
struct stat info;
if (stat(path.c_str(), &info) == 0) {
return S_ISDIR(info.st_mode);
}
size_t pos_slash = 1; // skip leading slashes for directory creation
// process path from front to back, procedurally creating directories
while ((pos_slash = path.find('/', pos_slash)) != std::string::npos) {
const std::string subpath = path.substr(0, pos_slash);
struct stat info;
// if the path already exists, ensure that it's a directory
if (stat(subpath.c_str(), &info) == 0) {
if (!S_ISDIR(info.st_mode)) {
return false;
}
} else {
// create parent directories
const int ret = mkdir(subpath.c_str(), 0755);
if (ret != 0) {
return false;
}
}
pos_slash += 1;
}
return true;
#endif // _WIN32
}
// NOTE: this is copied from common.cpp to avoid linking with libcommon
static std::string fs_get_cache_directory() {
std::string cache_directory = "";
auto ensure_trailing_slash = [](std::string p) {
// Make sure to add trailing slash
if (p.back() != DIRECTORY_SEPARATOR) {
p += DIRECTORY_SEPARATOR;
}
return p;
};
if (getenv("LLAMA_CACHE")) {
cache_directory = std::getenv("LLAMA_CACHE");
} else {
#ifdef __linux__
if (std::getenv("XDG_CACHE_HOME")) {
cache_directory = std::getenv("XDG_CACHE_HOME");
} else {
cache_directory = std::getenv("HOME") + std::string("/.cache/");
}
#elif defined(__APPLE__)
cache_directory = std::getenv("HOME") + std::string("/Library/Caches/");
#elif defined(_WIN32)
cache_directory = std::getenv("LOCALAPPDATA");
#endif // __linux__
cache_directory = ensure_trailing_slash(cache_directory);
cache_directory += "llama.cpp";
}
return ensure_trailing_slash(cache_directory);
}
struct rpc_server_params { struct rpc_server_params {
std::string host = "127.0.0.1"; std::string host = "127.0.0.1";
int port = 50052; int port = 50052;
size_t backend_mem = 0; size_t backend_mem = 0;
bool use_cache = false;
}; };
static void print_usage(int /*argc*/, char ** argv, rpc_server_params params) { static void print_usage(int /*argc*/, char ** argv, rpc_server_params params) {
@ -38,6 +157,7 @@ static void print_usage(int /*argc*/, char ** argv, rpc_server_params params) {
fprintf(stderr, " -H HOST, --host HOST host to bind to (default: %s)\n", params.host.c_str()); fprintf(stderr, " -H HOST, --host HOST host to bind to (default: %s)\n", params.host.c_str());
fprintf(stderr, " -p PORT, --port PORT port to bind to (default: %d)\n", params.port); fprintf(stderr, " -p PORT, --port PORT port to bind to (default: %d)\n", params.port);
fprintf(stderr, " -m MEM, --mem MEM backend memory size (in MB)\n"); fprintf(stderr, " -m MEM, --mem MEM backend memory size (in MB)\n");
fprintf(stderr, " -c, --cache enable local file cache\n");
fprintf(stderr, "\n"); fprintf(stderr, "\n");
} }
@ -58,6 +178,8 @@ static bool rpc_server_params_parse(int argc, char ** argv, rpc_server_params &
if (params.port <= 0 || params.port > 65535) { if (params.port <= 0 || params.port > 65535) {
return false; return false;
} }
} else if (arg == "-c" || arg == "--cache") {
params.use_cache = true;
} else if (arg == "-m" || arg == "--mem") { } else if (arg == "-m" || arg == "--mem") {
if (++i >= argc) { if (++i >= argc) {
return false; return false;
@ -164,8 +286,20 @@ int main(int argc, char * argv[]) {
} else { } else {
get_backend_memory(&free_mem, &total_mem); get_backend_memory(&free_mem, &total_mem);
} }
printf("Starting RPC server on %s, backend memory: %zu MB\n", endpoint.c_str(), free_mem / (1024 * 1024)); const char * cache_dir = nullptr;
ggml_backend_rpc_start_server(backend, endpoint.c_str(), free_mem, total_mem); std::string cache_dir_str = fs_get_cache_directory() + "rpc/";
if (params.use_cache) {
if (!fs_create_directory_with_parents(cache_dir_str)) {
fprintf(stderr, "Failed to create cache directory: %s\n", cache_dir_str.c_str());
return 1;
}
cache_dir = cache_dir_str.c_str();
}
printf("Starting RPC server\n");
printf(" endpoint : %s\n", endpoint.c_str());
printf(" local cache : %s\n", cache_dir ? cache_dir : "n/a");
printf(" backend memory : %zu MB\n", free_mem / (1024 * 1024));
ggml_backend_rpc_start_server(backend, endpoint.c_str(), cache_dir, free_mem, total_mem);
ggml_backend_free(backend); ggml_backend_free(backend);
return 0; return 0;
} }

View File

@ -17,7 +17,9 @@ GGML_BACKEND_API ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const c
GGML_BACKEND_API void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, size_t * total); GGML_BACKEND_API void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, size_t * total);
GGML_BACKEND_API void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, size_t free_mem, size_t total_mem); GGML_BACKEND_API void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint,
const char * cache_dir,
size_t free_mem, size_t total_mem);
GGML_BACKEND_API ggml_backend_reg_t ggml_backend_rpc_reg(void); GGML_BACKEND_API ggml_backend_reg_t ggml_backend_rpc_reg(void);

View File

@ -26,6 +26,10 @@
# include <unistd.h> # include <unistd.h>
#endif #endif
#include <cstring> #include <cstring>
#include <fstream>
#include <filesystem>
namespace fs = std::filesystem;
#ifdef _WIN32 #ifdef _WIN32
typedef SOCKET sockfd_t; typedef SOCKET sockfd_t;
@ -80,6 +84,7 @@ enum rpc_cmd {
RPC_CMD_FREE_BUFFER, RPC_CMD_FREE_BUFFER,
RPC_CMD_BUFFER_CLEAR, RPC_CMD_BUFFER_CLEAR,
RPC_CMD_SET_TENSOR, RPC_CMD_SET_TENSOR,
RPC_CMD_SET_TENSOR_HASH,
RPC_CMD_GET_TENSOR, RPC_CMD_GET_TENSOR,
RPC_CMD_COPY_TENSOR, RPC_CMD_COPY_TENSOR,
RPC_CMD_GRAPH_COMPUTE, RPC_CMD_GRAPH_COMPUTE,
@ -89,6 +94,9 @@ enum rpc_cmd {
RPC_CMD_COUNT, RPC_CMD_COUNT,
}; };
// Try RPC_CMD_SET_TENSOR_HASH first when data size is larger than this threshold
const size_t HASH_THRESHOLD = 10 * 1024 * 1024;
struct rpc_msg_get_alloc_size_req { struct rpc_msg_get_alloc_size_req {
rpc_tensor tensor; rpc_tensor tensor;
}; };
@ -135,6 +143,10 @@ struct rpc_msg_buffer_clear_req {
uint8_t value; uint8_t value;
}; };
struct rpc_msg_set_tensor_hash_rsp {
uint8_t result;
};
struct rpc_msg_get_tensor_req { struct rpc_msg_get_tensor_req {
rpc_tensor tensor; rpc_tensor tensor;
uint64_t offset; uint64_t offset;
@ -187,6 +199,18 @@ struct ggml_backend_rpc_buffer_context {
// RPC helper functions // RPC helper functions
// Computes FNV-1a hash of the data
static uint64_t fnv_hash(const uint8_t * data, size_t len) {
const uint64_t fnv_prime = 0x100000001b3ULL;
uint64_t hash = 0xcbf29ce484222325ULL;
for (size_t i = 0; i < len; ++i) {
hash ^= data[i];
hash *= fnv_prime;
}
return hash;
}
static std::shared_ptr<socket_t> make_socket(sockfd_t fd) { static std::shared_ptr<socket_t> make_socket(sockfd_t fd) {
#ifdef _WIN32 #ifdef _WIN32
if (fd == INVALID_SOCKET) { if (fd == INVALID_SOCKET) {
@ -483,10 +507,26 @@ static enum ggml_status ggml_backend_rpc_buffer_init_tensor(ggml_backend_buffer_
static void ggml_backend_rpc_buffer_set_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor, const void * data, size_t offset, size_t size) { static void ggml_backend_rpc_buffer_set_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor, const void * data, size_t offset, size_t size) {
ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context;
// input serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes) | rpc_tensor rpc_tensor = serialize_tensor(tensor);
if (size > HASH_THRESHOLD) {
// input serialization format: | rpc_tensor | offset (8 bytes) | hash (8 bytes)
size_t input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + sizeof(uint64_t);
std::vector<uint8_t> input(input_size, 0);
uint64_t hash = fnv_hash((const uint8_t*)data, size);
memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor));
memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset));
memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), &hash, sizeof(hash));
rpc_msg_set_tensor_hash_rsp response;
bool status = send_rpc_cmd(ctx->sock, RPC_CMD_SET_TENSOR_HASH, input.data(), input.size(), &response, sizeof(response));
GGML_ASSERT(status);
if (response.result) {
// the server has the same data, no need to send it
return;
}
}
// input serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes)
size_t input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + size; size_t input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + size;
std::vector<uint8_t> input(input_size, 0); std::vector<uint8_t> input(input_size, 0);
rpc_tensor rpc_tensor = serialize_tensor(tensor);
memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor)); memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor));
memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset)); memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset));
memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), data, size); memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), data, size);
@ -772,7 +812,9 @@ void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, si
class rpc_server { class rpc_server {
public: public:
rpc_server(ggml_backend_t backend) : backend(backend) {} rpc_server(ggml_backend_t backend, const char * cache_dir)
: backend(backend), cache_dir(cache_dir) {
}
~rpc_server(); ~rpc_server();
void alloc_buffer(const rpc_msg_alloc_buffer_req & request, rpc_msg_alloc_buffer_rsp & response); void alloc_buffer(const rpc_msg_alloc_buffer_req & request, rpc_msg_alloc_buffer_rsp & response);
@ -782,6 +824,7 @@ public:
bool free_buffer(const rpc_msg_free_buffer_req & request); bool free_buffer(const rpc_msg_free_buffer_req & request);
bool buffer_clear(const rpc_msg_buffer_clear_req & request); bool buffer_clear(const rpc_msg_buffer_clear_req & request);
bool set_tensor(const std::vector<uint8_t> & input); bool set_tensor(const std::vector<uint8_t> & input);
bool set_tensor_hash(const std::vector<uint8_t> & input, rpc_msg_set_tensor_hash_rsp & response);
bool get_tensor(const rpc_msg_get_tensor_req & request, std::vector<uint8_t> & response); bool get_tensor(const rpc_msg_get_tensor_req & request, std::vector<uint8_t> & response);
bool copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_copy_tensor_rsp & response); bool copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_copy_tensor_rsp & response);
bool graph_compute(const std::vector<uint8_t> & input, rpc_msg_graph_compute_rsp & response); bool graph_compute(const std::vector<uint8_t> & input, rpc_msg_graph_compute_rsp & response);
@ -789,6 +832,7 @@ public:
bool get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response); bool get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response);
private: private:
bool get_cached_file(uint64_t hash, std::vector<uint8_t> & data);
ggml_tensor * deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor); ggml_tensor * deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor);
ggml_tensor * create_node(uint64_t id, ggml_tensor * create_node(uint64_t id,
struct ggml_context * ctx, struct ggml_context * ctx,
@ -797,6 +841,7 @@ private:
ggml_backend_t backend; ggml_backend_t backend;
const char * cache_dir;
std::unordered_set<ggml_backend_buffer_t> buffers; std::unordered_set<ggml_backend_buffer_t> buffers;
}; };
@ -960,11 +1005,85 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
} }
const void * data = input.data() + sizeof(rpc_tensor) + sizeof(offset); const void * data = input.data() + sizeof(rpc_tensor) + sizeof(offset);
if (cache_dir && size > HASH_THRESHOLD) {
uint64_t hash = fnv_hash((const uint8_t*)data, size);
char hash_str[17];
snprintf(hash_str, sizeof(hash_str), "%016" PRIx64, hash);
// save to cache_dir/hash_str
fs::path cache_file = fs::path(cache_dir) / hash_str;
std::ofstream ofs(cache_file, std::ios::binary);
ofs.write((const char *)data, size);
printf("[%s] saved to '%s'\n", __func__, cache_file.c_str());
}
ggml_backend_tensor_set(tensor, data, offset, size); ggml_backend_tensor_set(tensor, data, offset, size);
ggml_free(ctx); ggml_free(ctx);
return true; return true;
} }
bool rpc_server::get_cached_file(uint64_t hash, std::vector<uint8_t> & data) {
if (!cache_dir) {
return false;
}
char hash_str[17];
snprintf(hash_str, sizeof(hash_str), "%016" PRIx64, hash);
fs::path cache_file = fs::path(cache_dir) / hash_str;
if (!fs::exists(cache_file)) {
return false;
}
std::ifstream ifs(cache_file, std::ios::binary);
ifs.seekg(0, std::ios::end);
size_t size = ifs.tellg();
ifs.seekg(0, std::ios::beg);
data.resize(size);
ifs.read((char *)data.data(), size);
return true;
}
bool rpc_server::set_tensor_hash(const std::vector<uint8_t> & input, rpc_msg_set_tensor_hash_rsp & response)
{
// serialization format: | rpc_tensor | offset (8 bytes) | hash (8 bytes) |
if (input.size() != sizeof(rpc_tensor) + 16) {
return false;
}
const rpc_tensor * in_tensor = (const rpc_tensor *)input.data();
uint64_t offset;
memcpy(&offset, input.data() + sizeof(rpc_tensor), sizeof(offset));
const uint64_t * hash = (const uint64_t *)(input.data() + sizeof(rpc_tensor) + sizeof(offset));
std::vector<uint8_t> cached_file;
if (!get_cached_file(*hash, cached_file)) {
response.result = 0;
return true;
}
size_t size = cached_file.size();
struct ggml_init_params params {
/*.mem_size =*/ ggml_tensor_overhead(),
/*.mem_buffer =*/ NULL,
/*.no_alloc =*/ true,
};
struct ggml_context * ctx = ggml_init(params);
ggml_tensor * tensor = deserialize_tensor(ctx, in_tensor);
if (tensor == nullptr) {
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
ggml_free(ctx);
return false;
}
GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n", __func__, (void*)tensor->buffer, tensor->data, offset, size, *hash);
// sanitize tensor->data
{
const size_t p0 = (size_t) ggml_backend_buffer_get_base(tensor->buffer);
const size_t p1 = p0 + ggml_backend_buffer_get_size(tensor->buffer);
if (in_tensor->data + offset < p0 || in_tensor->data + offset >= p1 || size > (p1 - in_tensor->data - offset)) {
GGML_ABORT("[%s] tensor->data out of bounds\n", __func__);
}
}
ggml_backend_tensor_set(tensor, cached_file.data(), offset, size);
response.result = 1;
ggml_free(ctx);
return true;
}
bool rpc_server::init_tensor(const rpc_msg_init_tensor_req & request) { bool rpc_server::init_tensor(const rpc_msg_init_tensor_req & request) {
struct ggml_init_params params { struct ggml_init_params params {
/*.mem_size =*/ ggml_tensor_overhead(), /*.mem_size =*/ ggml_tensor_overhead(),
@ -1148,8 +1267,9 @@ rpc_server::~rpc_server() {
} }
} }
static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t free_mem, size_t total_mem) { static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
rpc_server server(backend); sockfd_t sockfd, size_t free_mem, size_t total_mem) {
rpc_server server(backend, cache_dir);
while (true) { while (true) {
uint8_t cmd; uint8_t cmd;
if (!recv_data(sockfd, &cmd, 1)) { if (!recv_data(sockfd, &cmd, 1)) {
@ -1260,6 +1380,20 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre
} }
break; break;
} }
case RPC_CMD_SET_TENSOR_HASH: {
std::vector<uint8_t> input;
if (!recv_msg(sockfd, input)) {
return;
}
rpc_msg_set_tensor_hash_rsp response;
if (!server.set_tensor_hash(input, response)) {
return;
}
if (!send_msg(sockfd, &response, sizeof(response))) {
return;
}
break;
}
case RPC_CMD_INIT_TENSOR: { case RPC_CMD_INIT_TENSOR: {
rpc_msg_init_tensor_req request; rpc_msg_init_tensor_req request;
if (!recv_msg(sockfd, &request,sizeof(request))) { if (!recv_msg(sockfd, &request,sizeof(request))) {
@ -1335,7 +1469,9 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre
} }
} }
void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, size_t free_mem, size_t total_mem) { void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint,
const char * cache_dir,
size_t free_mem, size_t total_mem) {
std::string host; std::string host;
int port; int port;
if (!parse_endpoint(endpoint, host, port)) { if (!parse_endpoint(endpoint, host, port)) {
@ -1364,7 +1500,7 @@ void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint
} }
printf("Accepted client connection, free_mem=%zu, total_mem=%zu\n", free_mem, total_mem); printf("Accepted client connection, free_mem=%zu, total_mem=%zu\n", free_mem, total_mem);
fflush(stdout); fflush(stdout);
rpc_serve_client(backend, client_socket->fd, free_mem, total_mem); rpc_serve_client(backend, cache_dir, client_socket->fd, free_mem, total_mem);
printf("Client connection closed\n"); printf("Client connection closed\n");
fflush(stdout); fflush(stdout);
} }