Speed up Snappy uncompression, new Logger interface.
- Removed one copy of an uncompressed block contents changing the signature of Snappy_Uncompress() so it uncompresses into a flat array instead of a std::string. Speeds up readrandom ~10%. - Instead of a combination of Env/WritableFile, we now have a Logger interface that can be easily overridden applications that want to supply their own logging. - Separated out the gcc and Sun Studio parts of atomic_pointer.h so we can use 'asm', 'volatile' keywords for Sun Studio. git-svn-id: https://leveldb.googlecode.com/svn/trunk@39 62dab493-f737-651d-591e-8d6aee1b9529
This commit is contained in:
parent
6872ace901
commit
60bd8015f2
@ -472,13 +472,14 @@ class Benchmark {
|
||||
std::string compressed;
|
||||
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
|
||||
int64_t bytes = 0;
|
||||
std::string uncompressed;
|
||||
char* uncompressed = new char[input.size()];
|
||||
while (ok && bytes < 1024 * 1048576) { // Compress 1G
|
||||
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
|
||||
&uncompressed);
|
||||
bytes += uncompressed.size();
|
||||
uncompressed);
|
||||
bytes += input.size();
|
||||
FinishedSingleOp();
|
||||
}
|
||||
delete[] uncompressed;
|
||||
|
||||
if (!ok) {
|
||||
message_ = "(snappy failure)";
|
||||
|
@ -68,16 +68,6 @@ struct DBImpl::CompactionState {
|
||||
}
|
||||
};
|
||||
|
||||
namespace {
|
||||
class NullWritableFile : public WritableFile {
|
||||
public:
|
||||
virtual Status Append(const Slice& data) { return Status::OK(); }
|
||||
virtual Status Close() { return Status::OK(); }
|
||||
virtual Status Flush() { return Status::OK(); }
|
||||
virtual Status Sync() { return Status::OK(); }
|
||||
};
|
||||
}
|
||||
|
||||
// Fix user-supplied options to be reasonable
|
||||
template <class T,class V>
|
||||
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
|
||||
@ -96,11 +86,10 @@ Options SanitizeOptions(const std::string& dbname,
|
||||
// Open a log file in the same directory as the db
|
||||
src.env->CreateDir(dbname); // In case it does not exist
|
||||
src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
|
||||
Status s = src.env->NewWritableFile(InfoLogFileName(dbname),
|
||||
&result.info_log);
|
||||
Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
|
||||
if (!s.ok()) {
|
||||
// No place suitable for logging
|
||||
result.info_log = new NullWritableFile;
|
||||
result.info_log = NULL;
|
||||
}
|
||||
}
|
||||
if (result.block_cache == NULL) {
|
||||
@ -201,7 +190,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
|
||||
if (s->ok() || options_.paranoid_checks) {
|
||||
// No change needed
|
||||
} else {
|
||||
Log(env_, options_.info_log, "Ignoring error %s", s->ToString().c_str());
|
||||
Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
|
||||
*s = Status::OK();
|
||||
}
|
||||
}
|
||||
@ -247,7 +236,7 @@ void DBImpl::DeleteObsoleteFiles() {
|
||||
if (type == kTableFile) {
|
||||
table_cache_->Evict(number);
|
||||
}
|
||||
Log(env_, options_.info_log, "Delete type=%d #%lld\n",
|
||||
Log(options_.info_log, "Delete type=%d #%lld\n",
|
||||
int(type),
|
||||
static_cast<unsigned long long>(number));
|
||||
env_->DeleteFile(dbname_ + "/" + filenames[i]);
|
||||
@ -336,11 +325,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
SequenceNumber* max_sequence) {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Env* env;
|
||||
WritableFile* info_log;
|
||||
Logger* info_log;
|
||||
const char* fname;
|
||||
Status* status; // NULL if options_.paranoid_checks==false
|
||||
virtual void Corruption(size_t bytes, const Status& s) {
|
||||
Log(env, info_log, "%s%s: dropping %d bytes; %s",
|
||||
Log(info_log, "%s%s: dropping %d bytes; %s",
|
||||
(this->status == NULL ? "(ignoring error) " : ""),
|
||||
fname, static_cast<int>(bytes), s.ToString().c_str());
|
||||
if (this->status != NULL && this->status->ok()) *this->status = s;
|
||||
@ -370,7 +359,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
// large sequence numbers).
|
||||
log::Reader reader(file, &reporter, true/*checksum*/,
|
||||
0/*initial_offset*/);
|
||||
Log(env_, options_.info_log, "Recovering log #%llu",
|
||||
Log(options_.info_log, "Recovering log #%llu",
|
||||
(unsigned long long) log_number);
|
||||
|
||||
// Read all the records and add to a memtable
|
||||
@ -434,7 +423,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
|
||||
meta.number = versions_->NewFileNumber();
|
||||
pending_outputs_.insert(meta.number);
|
||||
Iterator* iter = mem->NewIterator();
|
||||
Log(env_, options_.info_log, "Level-0 table #%llu: started",
|
||||
Log(options_.info_log, "Level-0 table #%llu: started",
|
||||
(unsigned long long) meta.number);
|
||||
|
||||
Status s;
|
||||
@ -444,7 +433,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
||||
Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s",
|
||||
Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
|
||||
(unsigned long long) meta.number,
|
||||
(unsigned long long) meta.file_size,
|
||||
s.ToString().c_str());
|
||||
@ -613,7 +602,7 @@ void DBImpl::BackgroundCompaction() {
|
||||
f->smallest, f->largest);
|
||||
status = versions_->LogAndApply(c->edit());
|
||||
VersionSet::LevelSummaryStorage tmp;
|
||||
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
|
||||
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
|
||||
static_cast<unsigned long long>(f->number),
|
||||
c->level() + 1,
|
||||
static_cast<unsigned long long>(f->file_size),
|
||||
@ -631,7 +620,7 @@ void DBImpl::BackgroundCompaction() {
|
||||
} else if (shutting_down_.Acquire_Load()) {
|
||||
// Ignore compaction errors found during shutting down
|
||||
} else {
|
||||
Log(env_, options_.info_log,
|
||||
Log(options_.info_log,
|
||||
"Compaction error: %s", status.ToString().c_str());
|
||||
if (options_.paranoid_checks && bg_error_.ok()) {
|
||||
bg_error_ = status;
|
||||
@ -727,7 +716,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
||||
s = iter->status();
|
||||
delete iter;
|
||||
if (s.ok()) {
|
||||
Log(env_, options_.info_log,
|
||||
Log(options_.info_log,
|
||||
"Generated table #%llu: %lld keys, %lld bytes",
|
||||
(unsigned long long) output_number,
|
||||
(unsigned long long) current_entries,
|
||||
@ -740,7 +729,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
||||
|
||||
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
|
||||
mutex_.AssertHeld();
|
||||
Log(env_, options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
|
||||
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
|
||||
compact->compaction->num_input_files(0),
|
||||
compact->compaction->level(),
|
||||
compact->compaction->num_input_files(1),
|
||||
@ -776,7 +765,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
||||
|
||||
Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files",
|
||||
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
|
||||
compact->compaction->num_input_files(0),
|
||||
compact->compaction->level(),
|
||||
compact->compaction->num_input_files(1),
|
||||
@ -859,7 +848,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
last_sequence_for_key = ikey.sequence;
|
||||
}
|
||||
#if 0
|
||||
Log(env_, options_.info_log,
|
||||
Log(options_.info_log,
|
||||
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
|
||||
"%d smallest_snapshot: %d",
|
||||
ikey.user_key.ToString().c_str(),
|
||||
@ -925,7 +914,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
status = InstallCompactionResults(compact);
|
||||
}
|
||||
VersionSet::LevelSummaryStorage tmp;
|
||||
Log(env_, options_.info_log,
|
||||
Log(options_.info_log,
|
||||
"compacted to: %s", versions_->LevelSummary(&tmp));
|
||||
return status;
|
||||
}
|
||||
@ -1112,7 +1101,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
|
||||
bg_cv_.Wait();
|
||||
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
|
||||
// There are too many level-0 files.
|
||||
Log(env_, options_.info_log, "waiting...\n");
|
||||
Log(options_.info_log, "waiting...\n");
|
||||
bg_cv_.Wait();
|
||||
} else {
|
||||
// Attempt to switch to a new memtable and trigger compaction of old
|
||||
|
20
db/repair.cc
20
db/repair.cc
@ -78,7 +78,7 @@ class Repairer {
|
||||
for (size_t i = 0; i < tables_.size(); i++) {
|
||||
bytes += tables_[i].meta.file_size;
|
||||
}
|
||||
Log(env_, options_.info_log,
|
||||
Log(options_.info_log,
|
||||
"**** Repaired leveldb %s; "
|
||||
"recovered %d files; %llu bytes. "
|
||||
"Some data may have been lost. "
|
||||
@ -149,7 +149,7 @@ class Repairer {
|
||||
std::string logname = LogFileName(dbname_, logs_[i]);
|
||||
Status status = ConvertLogToTable(logs_[i]);
|
||||
if (!status.ok()) {
|
||||
Log(env_, options_.info_log, "Log #%llu: ignoring conversion error: %s",
|
||||
Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
|
||||
(unsigned long long) logs_[i],
|
||||
status.ToString().c_str());
|
||||
}
|
||||
@ -160,11 +160,11 @@ class Repairer {
|
||||
Status ConvertLogToTable(uint64_t log) {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Env* env;
|
||||
WritableFile* info_log;
|
||||
Logger* info_log;
|
||||
uint64_t lognum;
|
||||
virtual void Corruption(size_t bytes, const Status& s) {
|
||||
// We print error messages for corruption, but continue repairing.
|
||||
Log(env, info_log, "Log #%llu: dropping %d bytes; %s",
|
||||
Log(info_log, "Log #%llu: dropping %d bytes; %s",
|
||||
(unsigned long long) lognum,
|
||||
static_cast<int>(bytes),
|
||||
s.ToString().c_str());
|
||||
@ -209,7 +209,7 @@ class Repairer {
|
||||
if (status.ok()) {
|
||||
counter += WriteBatchInternal::Count(&batch);
|
||||
} else {
|
||||
Log(env_, options_.info_log, "Log #%llu: ignoring %s",
|
||||
Log(options_.info_log, "Log #%llu: ignoring %s",
|
||||
(unsigned long long) log,
|
||||
status.ToString().c_str());
|
||||
status = Status::OK(); // Keep going with rest of file
|
||||
@ -231,7 +231,7 @@ class Repairer {
|
||||
table_numbers_.push_back(meta.number);
|
||||
}
|
||||
}
|
||||
Log(env_, options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
|
||||
Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
|
||||
(unsigned long long) log,
|
||||
counter,
|
||||
(unsigned long long) meta.number,
|
||||
@ -247,7 +247,7 @@ class Repairer {
|
||||
Status status = ScanTable(&t);
|
||||
if (!status.ok()) {
|
||||
std::string fname = TableFileName(dbname_, table_numbers_[i]);
|
||||
Log(env_, options_.info_log, "Table #%llu: ignoring %s",
|
||||
Log(options_.info_log, "Table #%llu: ignoring %s",
|
||||
(unsigned long long) table_numbers_[i],
|
||||
status.ToString().c_str());
|
||||
ArchiveFile(fname);
|
||||
@ -270,7 +270,7 @@ class Repairer {
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
Slice key = iter->key();
|
||||
if (!ParseInternalKey(key, &parsed)) {
|
||||
Log(env_, options_.info_log, "Table #%llu: unparsable key %s",
|
||||
Log(options_.info_log, "Table #%llu: unparsable key %s",
|
||||
(unsigned long long) t->meta.number,
|
||||
EscapeString(key).c_str());
|
||||
continue;
|
||||
@ -291,7 +291,7 @@ class Repairer {
|
||||
}
|
||||
delete iter;
|
||||
}
|
||||
Log(env_, options_.info_log, "Table #%llu: %d entries %s",
|
||||
Log(options_.info_log, "Table #%llu: %d entries %s",
|
||||
(unsigned long long) t->meta.number,
|
||||
counter,
|
||||
status.ToString().c_str());
|
||||
@ -373,7 +373,7 @@ class Repairer {
|
||||
new_file.append("/");
|
||||
new_file.append((slash == NULL) ? fname.c_str() : slash + 1);
|
||||
Status s = env_->RenameFile(fname, new_file);
|
||||
Log(env_, options_.info_log, "Archiving %s: %s\n",
|
||||
Log(options_.info_log, "Archiving %s: %s\n",
|
||||
fname.c_str(), s.ToString().c_str());
|
||||
}
|
||||
};
|
||||
|
@ -1124,7 +1124,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
|
||||
std::vector<FileMetaData*> expanded1;
|
||||
GetOverlappingInputs(level+1, new_start, new_limit, &expanded1);
|
||||
if (expanded1.size() == c->inputs_[1].size()) {
|
||||
Log(env_, options_->info_log,
|
||||
Log(options_->info_log,
|
||||
"Expanding@%d %d+%d to %d+%d\n",
|
||||
level,
|
||||
int(c->inputs_[0].size()),
|
||||
@ -1147,7 +1147,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
|
||||
}
|
||||
|
||||
if (false) {
|
||||
Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'",
|
||||
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
|
||||
level,
|
||||
EscapeString(smallest.Encode()).c_str(),
|
||||
EscapeString(largest.Encode()).c_str());
|
||||
|
@ -22,6 +22,7 @@
|
||||
namespace leveldb {
|
||||
|
||||
class FileLock;
|
||||
class Logger;
|
||||
class RandomAccessFile;
|
||||
class SequentialFile;
|
||||
class Slice;
|
||||
@ -134,8 +135,8 @@ class Env {
|
||||
// same directory.
|
||||
virtual Status GetTestDirectory(std::string* path) = 0;
|
||||
|
||||
// Write an entry to the log file with the specified format.
|
||||
virtual void Logv(WritableFile* log, const char* format, va_list ap) = 0;
|
||||
// Create and return a log file for storing informational messages.
|
||||
virtual Status NewLogger(const std::string& fname, Logger** result) = 0;
|
||||
|
||||
// Returns the number of micro-seconds since some fixed point in time. Only
|
||||
// useful for computing deltas of time.
|
||||
@ -210,6 +211,22 @@ class WritableFile {
|
||||
void operator=(const WritableFile&);
|
||||
};
|
||||
|
||||
// An interface for writing log messages.
|
||||
class Logger {
|
||||
public:
|
||||
Logger() { }
|
||||
virtual ~Logger();
|
||||
|
||||
// Write an entry to the log file with the specified format.
|
||||
virtual void Logv(const char* format, va_list ap) = 0;
|
||||
|
||||
private:
|
||||
// No copying allowed
|
||||
Logger(const Logger&);
|
||||
void operator=(const Logger&);
|
||||
};
|
||||
|
||||
|
||||
// Identifies a locked file.
|
||||
class FileLock {
|
||||
public:
|
||||
@ -222,9 +239,9 @@ class FileLock {
|
||||
};
|
||||
|
||||
// Log the specified data to *info_log if info_log is non-NULL.
|
||||
extern void Log(Env* env, WritableFile* info_log, const char* format, ...)
|
||||
extern void Log(Logger* info_log, const char* format, ...)
|
||||
# if defined(__GNUC__) || defined(__clang__)
|
||||
__attribute__((__format__ (__printf__, 3, 4)))
|
||||
__attribute__((__format__ (__printf__, 2, 3)))
|
||||
# endif
|
||||
;
|
||||
|
||||
@ -284,8 +301,8 @@ class EnvWrapper : public Env {
|
||||
virtual Status GetTestDirectory(std::string* path) {
|
||||
return target_->GetTestDirectory(path);
|
||||
}
|
||||
virtual void Logv(WritableFile* log, const char* format, va_list ap) {
|
||||
return target_->Logv(log, format, ap);
|
||||
virtual Status NewLogger(const std::string& fname, Logger** result) {
|
||||
return target_->NewLogger(fname, result);
|
||||
}
|
||||
uint64_t NowMicros() {
|
||||
return target_->NowMicros();
|
||||
|
@ -12,8 +12,8 @@ namespace leveldb {
|
||||
class Cache;
|
||||
class Comparator;
|
||||
class Env;
|
||||
class Logger;
|
||||
class Snapshot;
|
||||
class WritableFile;
|
||||
|
||||
// DB contents are stored in a set of blocks, each of which holds a
|
||||
// sequence of key,value pairs. Each block may be compressed before
|
||||
@ -61,10 +61,10 @@ struct Options {
|
||||
Env* env;
|
||||
|
||||
// Any internal progress/error information generated by the db will
|
||||
// be to written to info_log if it is non-NULL, or to a file stored
|
||||
// be written to info_log if it is non-NULL, or to a file stored
|
||||
// in the same directory as the DB contents if info_log is NULL.
|
||||
// Default: NULL
|
||||
WritableFile* info_log;
|
||||
Logger* info_log;
|
||||
|
||||
// -------------------
|
||||
// Parameters that affect performance
|
||||
|
@ -48,9 +48,8 @@ namespace port {
|
||||
// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
|
||||
#define LEVELDB_HAVE_MEMORY_BARRIER
|
||||
|
||||
// Gcc and Sun Studio on x86
|
||||
#elif defined(ARCH_CPU_X86_FAMILY) && \
|
||||
(defined(__GNUC__) || defined(__SUNPRO_CC))
|
||||
// Gcc on x86
|
||||
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
|
||||
inline void MemoryBarrier() {
|
||||
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
|
||||
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
|
||||
@ -58,6 +57,15 @@ inline void MemoryBarrier() {
|
||||
}
|
||||
#define LEVELDB_HAVE_MEMORY_BARRIER
|
||||
|
||||
// Sun Studio
|
||||
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC)
|
||||
inline void MemoryBarrier() {
|
||||
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
|
||||
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
|
||||
asm volatile("" : : : "memory");
|
||||
}
|
||||
#define LEVELDB_HAVE_MEMORY_BARRIER
|
||||
|
||||
// Mac OS
|
||||
#elif defined(OS_MACOSX)
|
||||
inline void MemoryBarrier() {
|
||||
|
@ -125,11 +125,17 @@ inline bool Snappy_Compress(
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO(gabor): Implement uncompress
|
||||
inline bool Snappy_GetUncompressedLength(const char* input, size_t length,
|
||||
size_t* result) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO(gabor): Implement uncompress
|
||||
inline bool Snappy_Uncompress(
|
||||
const char* input_data,
|
||||
size_t input_length,
|
||||
std::string* output) {
|
||||
char* output) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -62,15 +62,19 @@ bool Snappy_Compress(const char* input, size_t input_length,
|
||||
#endif
|
||||
}
|
||||
|
||||
bool Snappy_Uncompress(const char* input_data, size_t input_length,
|
||||
std::string* output) {
|
||||
bool Snappy_GetUncompressedLength(const char* input, size_t length,
|
||||
size_t* result) {
|
||||
#if defined(USE_SNAPPY)
|
||||
size_t ulength;
|
||||
if (!snappy::GetUncompressedLength(input_data, input_length, &ulength)) {
|
||||
return snappy::GetUncompressedLength(input_data, input_length, result);
|
||||
#else
|
||||
return false;
|
||||
}
|
||||
output->resize(ulength);
|
||||
return snappy::RawUncompress(input_data, input_length, &(*output)[0]);
|
||||
#endif
|
||||
}
|
||||
|
||||
bool Snappy_Uncompress(const char* input_data, size_t input_length,
|
||||
char* output) {
|
||||
#if defined(USE_SNAPPY)
|
||||
return snappy::RawUncompress(input_data, input_length, output);
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
|
@ -84,8 +84,10 @@ class AtomicPointer {
|
||||
|
||||
bool Snappy_Compress(const char* input, size_t input_length,
|
||||
std::string* output);
|
||||
bool Snappy_GetUncompressedLength(const char* input, size_t length,
|
||||
size_t* result);
|
||||
bool Snappy_Uncompress(const char* input_data, size_t input_length,
|
||||
std::string* output);
|
||||
char* output);
|
||||
|
||||
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
|
||||
return false;
|
||||
|
@ -96,11 +96,21 @@ class AtomicPointer {
|
||||
extern bool Snappy_Compress(const char* input, size_t input_length,
|
||||
std::string* output);
|
||||
|
||||
// If input[0,input_length-1] looks like a valid snappy compressed
|
||||
// buffer, store the size of the uncompressed data in *result and
|
||||
// return true. Else return false.
|
||||
extern bool Snappy_GetUncompressedLength(const char* input, size_t length,
|
||||
size_t* result);
|
||||
|
||||
// Attempt to snappy uncompress input[0,input_length-1] into *output.
|
||||
// Returns true if successful, false if the input is invalid lightweight
|
||||
// compressed data.
|
||||
//
|
||||
// REQUIRES: at least the first "n" bytes of output[] must be writable
|
||||
// where "n" is the result of a successful call to
|
||||
// Snappy_GetUncompressedLength.
|
||||
extern bool Snappy_Uncompress(const char* input_data, size_t input_length,
|
||||
std::string* output);
|
||||
char* output);
|
||||
|
||||
// ------------------ Miscellaneous -------------------
|
||||
|
||||
|
@ -80,12 +80,12 @@ class CondVar {
|
||||
Mutex* mu_;
|
||||
};
|
||||
|
||||
inline bool Snappy_Compress(const char* input, size_t input_length,
|
||||
inline bool Snappy_Compress(const char* input, size_t length,
|
||||
::std::string* output) {
|
||||
#ifdef SNAPPY
|
||||
output->resize(snappy::MaxCompressedLength(input_length));
|
||||
output->resize(snappy::MaxCompressedLength(length));
|
||||
size_t outlen;
|
||||
snappy::RawCompress(input, input_length, &(*output)[0], &outlen);
|
||||
snappy::RawCompress(input, length, &(*output)[0], &outlen);
|
||||
output->resize(outlen);
|
||||
return true;
|
||||
#endif
|
||||
@ -93,18 +93,22 @@ inline bool Snappy_Compress(const char* input, size_t input_length,
|
||||
return false;
|
||||
}
|
||||
|
||||
inline bool Snappy_Uncompress(const char* input_data, size_t input_length,
|
||||
::std::string* output) {
|
||||
inline bool Snappy_GetUncompressedLength(const char* input, size_t length,
|
||||
size_t* result) {
|
||||
#ifdef SNAPPY
|
||||
size_t ulength;
|
||||
if (!snappy::GetUncompressedLength(input_data, input_length, &ulength)) {
|
||||
return snappy::GetUncompressedLength(input, length, result);
|
||||
#else
|
||||
return false;
|
||||
}
|
||||
output->resize(ulength);
|
||||
return snappy::RawUncompress(input_data, input_length, &(*output)[0]);
|
||||
#endif
|
||||
}
|
||||
|
||||
inline bool Snappy_Uncompress(const char* input, size_t length,
|
||||
char* output) {
|
||||
#ifdef SNAPPY
|
||||
return snappy::RawUncompress(input, length, output);
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
|
||||
|
@ -107,16 +107,20 @@ Status ReadBlock(RandomAccessFile* file,
|
||||
// Ok
|
||||
break;
|
||||
case kSnappyCompression: {
|
||||
std::string decompressed;
|
||||
if (!port::Snappy_Uncompress(data, n, &decompressed)) {
|
||||
size_t ulength = 0;
|
||||
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
|
||||
delete[] buf;
|
||||
s = Status::Corruption("corrupted compressed block contents");
|
||||
return s;
|
||||
return Status::Corruption("corrupted compressed block contents");
|
||||
}
|
||||
delete[] buf; // Done with uncompressed data
|
||||
buf = new char[decompressed.size()];
|
||||
memcpy(buf, decompressed.data(), decompressed.size());
|
||||
n = decompressed.size();
|
||||
char* ubuf = new char[ulength];
|
||||
if (!port::Snappy_Uncompress(data, n, ubuf)) {
|
||||
delete[] buf;
|
||||
delete[] ubuf;
|
||||
return Status::Corruption("corrupted compressed block contents");
|
||||
}
|
||||
delete[] buf;
|
||||
buf = ubuf;
|
||||
n = ulength;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -18,14 +18,19 @@ RandomAccessFile::~RandomAccessFile() {
|
||||
WritableFile::~WritableFile() {
|
||||
}
|
||||
|
||||
Logger::~Logger() {
|
||||
}
|
||||
|
||||
FileLock::~FileLock() {
|
||||
}
|
||||
|
||||
void Log(Env* env, WritableFile* info_log, const char* format, ...) {
|
||||
void Log(Logger* info_log, const char* format, ...) {
|
||||
if (info_log != NULL) {
|
||||
va_list ap;
|
||||
va_start(ap, format);
|
||||
env->Logv(info_log, format, ap);
|
||||
info_log->Logv(format, ap);
|
||||
va_end(ap);
|
||||
}
|
||||
}
|
||||
|
||||
Status WriteStringToFile(Env* env, const Slice& data,
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "leveldb/slice.h"
|
||||
#include "port/port.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/posix_logger.h"
|
||||
|
||||
#if defined(OS_WIN)
|
||||
#include <io.h>
|
||||
@ -406,9 +407,8 @@ class ChromiumEnv : public Env {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
|
||||
// TODO(jorlow): We may want to just use Chromium's built in logging.
|
||||
|
||||
// TODO(user,user): Use Chromium's built-in logging?
|
||||
static uint64_t gettid() {
|
||||
uint64_t thread_id = 0;
|
||||
// Coppied from base/logging.cc.
|
||||
#if defined(OS_WIN)
|
||||
@ -422,65 +422,17 @@ class ChromiumEnv : public Env {
|
||||
pthread_t tid = pthread_self();
|
||||
memcpy(&thread_id, &tid, min(sizeof(r), sizeof(tid)));
|
||||
#endif
|
||||
return thread_id;
|
||||
}
|
||||
|
||||
// We try twice: the first time with a fixed-size stack allocated buffer,
|
||||
// and the second time with a much larger dynamically allocated buffer.
|
||||
char buffer[500];
|
||||
for (int iter = 0; iter < 2; iter++) {
|
||||
char* base;
|
||||
int bufsize;
|
||||
if (iter == 0) {
|
||||
bufsize = sizeof(buffer);
|
||||
base = buffer;
|
||||
virtual Status NewLogger(const std::string& fname, Logger** result) {
|
||||
FILE* f = fopen(fname.c_str(), "w");
|
||||
if (f == NULL) {
|
||||
*result = NULL;
|
||||
return Status::IOError(fname, strerror(errno));
|
||||
} else {
|
||||
bufsize = 30000;
|
||||
base = new char[bufsize];
|
||||
}
|
||||
char* p = base;
|
||||
char* limit = base + bufsize;
|
||||
|
||||
::base::Time::Exploded t;
|
||||
::base::Time::Now().LocalExplode(&t);
|
||||
p += snprintf(p, limit - p,
|
||||
"%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
|
||||
t.year,
|
||||
t.month,
|
||||
t.day_of_month,
|
||||
t.hour,
|
||||
t.minute,
|
||||
t.second,
|
||||
static_cast<int>(t.millisecond) * 1000,
|
||||
static_cast<long long unsigned int>(thread_id));
|
||||
|
||||
// Print the message
|
||||
if (p < limit) {
|
||||
va_list backup_ap;
|
||||
va_copy(backup_ap, ap);
|
||||
p += vsnprintf(p, limit - p, format, backup_ap);
|
||||
va_end(backup_ap);
|
||||
}
|
||||
|
||||
// Truncate to available space if necessary
|
||||
if (p >= limit) {
|
||||
if (iter == 0) {
|
||||
continue; // Try again with larger buffer
|
||||
} else {
|
||||
p = limit - 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Add newline if necessary
|
||||
if (p == base || p[-1] != '\n') {
|
||||
*p++ = '\n';
|
||||
}
|
||||
|
||||
assert(p <= limit);
|
||||
info_log->Append(Slice(base, p - base));
|
||||
info_log->Flush();
|
||||
if (base != buffer) {
|
||||
delete[] base;
|
||||
}
|
||||
break;
|
||||
*result = new PosixLogger(f, &ChromiumEnv::gettid);
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "leveldb/slice.h"
|
||||
#include "port/port.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/posix_logger.h"
|
||||
|
||||
namespace leveldb {
|
||||
|
||||
@ -427,72 +428,21 @@ class PosixEnv : public Env {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
|
||||
static uint64_t gettid() {
|
||||
pthread_t tid = pthread_self();
|
||||
uint64_t thread_id = 0;
|
||||
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
|
||||
return thread_id;
|
||||
}
|
||||
|
||||
// We try twice: the first time with a fixed-size stack allocated buffer,
|
||||
// and the second time with a much larger dynamically allocated buffer.
|
||||
char buffer[500];
|
||||
for (int iter = 0; iter < 2; iter++) {
|
||||
char* base;
|
||||
int bufsize;
|
||||
if (iter == 0) {
|
||||
bufsize = sizeof(buffer);
|
||||
base = buffer;
|
||||
virtual Status NewLogger(const std::string& fname, Logger** result) {
|
||||
FILE* f = fopen(fname.c_str(), "w");
|
||||
if (f == NULL) {
|
||||
*result = NULL;
|
||||
return IOError(fname, errno);
|
||||
} else {
|
||||
bufsize = 30000;
|
||||
base = new char[bufsize];
|
||||
}
|
||||
char* p = base;
|
||||
char* limit = base + bufsize;
|
||||
|
||||
struct timeval now_tv;
|
||||
gettimeofday(&now_tv, NULL);
|
||||
const time_t seconds = now_tv.tv_sec;
|
||||
struct tm t;
|
||||
localtime_r(&seconds, &t);
|
||||
p += snprintf(p, limit - p,
|
||||
"%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
|
||||
t.tm_year + 1900,
|
||||
t.tm_mon + 1,
|
||||
t.tm_mday,
|
||||
t.tm_hour,
|
||||
t.tm_min,
|
||||
t.tm_sec,
|
||||
static_cast<int>(now_tv.tv_usec),
|
||||
static_cast<long long unsigned int>(thread_id));
|
||||
|
||||
// Print the message
|
||||
if (p < limit) {
|
||||
va_list backup_ap;
|
||||
va_copy(backup_ap, ap);
|
||||
p += vsnprintf(p, limit - p, format, backup_ap);
|
||||
va_end(backup_ap);
|
||||
}
|
||||
|
||||
// Truncate to available space if necessary
|
||||
if (p >= limit) {
|
||||
if (iter == 0) {
|
||||
continue; // Try again with larger buffer
|
||||
} else {
|
||||
p = limit - 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Add newline if necessary
|
||||
if (p == base || p[-1] != '\n') {
|
||||
*p++ = '\n';
|
||||
}
|
||||
|
||||
assert(p <= limit);
|
||||
info_log->Append(Slice(base, p - base));
|
||||
info_log->Flush();
|
||||
if (base != buffer) {
|
||||
delete[] base;
|
||||
}
|
||||
break;
|
||||
*result = new PosixLogger(f, &PosixEnv::gettid);
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
|
97
util/posix_logger.h
Normal file
97
util/posix_logger.h
Normal file
@ -0,0 +1,97 @@
|
||||
// Copyright 2011 Google Inc. All Rights Reserved.
|
||||
// Author: sanjay@google.com (Sanjay Ghemawat)
|
||||
//
|
||||
// Logger implementation that can be shared by all environments
|
||||
// where enough posix functionality is available.
|
||||
|
||||
#ifndef STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_
|
||||
#define STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_
|
||||
|
||||
#include <algorithm>
|
||||
#include <stdio.h>
|
||||
#include <sys/time.h>
|
||||
#include <time.h>
|
||||
#include "leveldb/env.h"
|
||||
|
||||
namespace leveldb {
|
||||
|
||||
class PosixLogger : public Logger {
|
||||
private:
|
||||
FILE* file_;
|
||||
uint64_t (*gettid_)(); // Return the thread id for the current thread
|
||||
public:
|
||||
PosixLogger(FILE* f, uint64_t (*gettid)()) : file_(f), gettid_(gettid) { }
|
||||
virtual ~PosixLogger() {
|
||||
fclose(file_);
|
||||
}
|
||||
virtual void Logv(const char* format, va_list ap) {
|
||||
const uint64_t thread_id = (*gettid_)();
|
||||
|
||||
// We try twice: the first time with a fixed-size stack allocated buffer,
|
||||
// and the second time with a much larger dynamically allocated buffer.
|
||||
char buffer[500];
|
||||
for (int iter = 0; iter < 2; iter++) {
|
||||
char* base;
|
||||
int bufsize;
|
||||
if (iter == 0) {
|
||||
bufsize = sizeof(buffer);
|
||||
base = buffer;
|
||||
} else {
|
||||
bufsize = 30000;
|
||||
base = new char[bufsize];
|
||||
}
|
||||
char* p = base;
|
||||
char* limit = base + bufsize;
|
||||
|
||||
struct timeval now_tv;
|
||||
gettimeofday(&now_tv, NULL);
|
||||
const time_t seconds = now_tv.tv_sec;
|
||||
struct tm t;
|
||||
localtime_r(&seconds, &t);
|
||||
p += snprintf(p, limit - p,
|
||||
"%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
|
||||
t.tm_year + 1900,
|
||||
t.tm_mon + 1,
|
||||
t.tm_mday,
|
||||
t.tm_hour,
|
||||
t.tm_min,
|
||||
t.tm_sec,
|
||||
static_cast<int>(now_tv.tv_usec),
|
||||
static_cast<long long unsigned int>(thread_id));
|
||||
|
||||
// Print the message
|
||||
if (p < limit) {
|
||||
va_list backup_ap;
|
||||
va_copy(backup_ap, ap);
|
||||
p += vsnprintf(p, limit - p, format, backup_ap);
|
||||
va_end(backup_ap);
|
||||
}
|
||||
|
||||
// Truncate to available space if necessary
|
||||
if (p >= limit) {
|
||||
if (iter == 0) {
|
||||
continue; // Try again with larger buffer
|
||||
} else {
|
||||
p = limit - 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Add newline if necessary
|
||||
if (p == base || p[-1] != '\n') {
|
||||
*p++ = '\n';
|
||||
}
|
||||
|
||||
assert(p <= limit);
|
||||
fwrite(base, 1, p - base, file_);
|
||||
fflush(file_);
|
||||
if (base != buffer) {
|
||||
delete[] base;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif // STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_
|
Loading…
Reference in New Issue
Block a user