webrtc/talk/base/stream.cc

1353 lines
37 KiB
C++
Raw Normal View History

/*
* libjingle
* Copyright 2004 Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#if defined(POSIX)
#include <sys/file.h>
#endif // POSIX
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <string>
#include "talk/base/basictypes.h"
#include "talk/base/common.h"
#include "talk/base/logging.h"
#include "talk/base/messagequeue.h"
#include "talk/base/stream.h"
#include "talk/base/stringencode.h"
#include "talk/base/stringutils.h"
#include "talk/base/thread.h"
#include "talk/base/timeutils.h"
#ifdef WIN32
#include "talk/base/win32.h"
#define fileno _fileno
#endif
namespace talk_base {
///////////////////////////////////////////////////////////////////////////////
// StreamInterface
///////////////////////////////////////////////////////////////////////////////
StreamInterface::~StreamInterface() {
}
StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
size_t* written, int* error) {
StreamResult result = SR_SUCCESS;
size_t total_written = 0, current_written;
while (total_written < data_len) {
result = Write(static_cast<const char*>(data) + total_written,
data_len - total_written, &current_written, error);
if (result != SR_SUCCESS)
break;
total_written += current_written;
}
if (written)
*written = total_written;
return result;
}
StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
size_t* read, int* error) {
StreamResult result = SR_SUCCESS;
size_t total_read = 0, current_read;
while (total_read < buffer_len) {
result = Read(static_cast<char*>(buffer) + total_read,
buffer_len - total_read, &current_read, error);
if (result != SR_SUCCESS)
break;
total_read += current_read;
}
if (read)
*read = total_read;
return result;
}
StreamResult StreamInterface::ReadLine(std::string* line) {
line->clear();
StreamResult result = SR_SUCCESS;
while (true) {
char ch;
result = Read(&ch, sizeof(ch), NULL, NULL);
if (result != SR_SUCCESS) {
break;
}
if (ch == '\n') {
break;
}
line->push_back(ch);
}
if (!line->empty()) { // give back the line we've collected so far with
result = SR_SUCCESS; // a success code. Otherwise return the last code
}
return result;
}
void StreamInterface::PostEvent(Thread* t, int events, int err) {
t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
}
void StreamInterface::PostEvent(int events, int err) {
PostEvent(Thread::Current(), events, err);
}
StreamInterface::StreamInterface() {
}
void StreamInterface::OnMessage(Message* msg) {
if (MSG_POST_EVENT == msg->message_id) {
StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
SignalEvent(this, pe->events, pe->error);
delete msg->pdata;
}
}
///////////////////////////////////////////////////////////////////////////////
// StreamAdapterInterface
///////////////////////////////////////////////////////////////////////////////
StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
bool owned)
: stream_(stream), owned_(owned) {
if (NULL != stream_)
stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
}
void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
if (NULL != stream_)
stream_->SignalEvent.disconnect(this);
if (owned_)
delete stream_;
stream_ = stream;
owned_ = owned;
if (NULL != stream_)
stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
}
StreamInterface* StreamAdapterInterface::Detach() {
if (NULL != stream_)
stream_->SignalEvent.disconnect(this);
StreamInterface* stream = stream_;
stream_ = NULL;
return stream;
}
StreamAdapterInterface::~StreamAdapterInterface() {
if (owned_)
delete stream_;
}
///////////////////////////////////////////////////////////////////////////////
// StreamTap
///////////////////////////////////////////////////////////////////////////////
StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
: StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
tap_error_(0) {
AttachTap(tap);
}
void StreamTap::AttachTap(StreamInterface* tap) {
tap_.reset(tap);
}
StreamInterface* StreamTap::DetachTap() {
return tap_.release();
}
StreamResult StreamTap::GetTapResult(int* error) {
if (error) {
*error = tap_error_;
}
return tap_result_;
}
StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
size_t backup_read;
if (!read) {
read = &backup_read;
}
StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
read, error);
if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
}
return res;
}
StreamResult StreamTap::Write(const void* data, size_t data_len,
size_t* written, int* error) {
size_t backup_written;
if (!written) {
written = &backup_written;
}
StreamResult res = StreamAdapterInterface::Write(data, data_len,
written, error);
if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
}
return res;
}
///////////////////////////////////////////////////////////////////////////////
// StreamSegment
///////////////////////////////////////////////////////////////////////////////
StreamSegment::StreamSegment(StreamInterface* stream)
: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
length_(SIZE_UNKNOWN) {
// It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
stream->GetPosition(&start_);
}
StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
length_(length) {
// It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
stream->GetPosition(&start_);
}
StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
if (SIZE_UNKNOWN != length_) {
if (pos_ >= length_)
return SR_EOS;
buffer_len = _min(buffer_len, length_ - pos_);
}
size_t backup_read;
if (!read) {
read = &backup_read;
}
StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
read, error);
if (SR_SUCCESS == result) {
pos_ += *read;
}
return result;
}
bool StreamSegment::SetPosition(size_t position) {
if (SIZE_UNKNOWN == start_)
return false; // Not seekable
if ((SIZE_UNKNOWN != length_) && (position > length_))
return false; // Seek past end of segment
if (!StreamAdapterInterface::SetPosition(start_ + position))
return false;
pos_ = position;
return true;
}
bool StreamSegment::GetPosition(size_t* position) const {
if (SIZE_UNKNOWN == start_)
return false; // Not seekable
if (!StreamAdapterInterface::GetPosition(position))
return false;
if (position) {
ASSERT(*position >= start_);
*position -= start_;
}
return true;
}
bool StreamSegment::GetSize(size_t* size) const {
if (!StreamAdapterInterface::GetSize(size))
return false;
if (size) {
if (SIZE_UNKNOWN != start_) {
ASSERT(*size >= start_);
*size -= start_;
}
if (SIZE_UNKNOWN != length_) {
*size = _min(*size, length_);
}
}
return true;
}
bool StreamSegment::GetAvailable(size_t* size) const {
if (!StreamAdapterInterface::GetAvailable(size))
return false;
if (size && (SIZE_UNKNOWN != length_))
*size = _min(*size, length_ - pos_);
return true;
}
///////////////////////////////////////////////////////////////////////////////
// NullStream
///////////////////////////////////////////////////////////////////////////////
NullStream::NullStream() {
}
NullStream::~NullStream() {
}
StreamState NullStream::GetState() const {
return SS_OPEN;
}
StreamResult NullStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
if (error) *error = -1;
return SR_ERROR;
}
StreamResult NullStream::Write(const void* data, size_t data_len,
size_t* written, int* error) {
if (written) *written = data_len;
return SR_SUCCESS;
}
void NullStream::Close() {
}
///////////////////////////////////////////////////////////////////////////////
// FileStream
///////////////////////////////////////////////////////////////////////////////
FileStream::FileStream() : file_(NULL) {
}
FileStream::~FileStream() {
FileStream::Close();
}
bool FileStream::Open(const std::string& filename, const char* mode,
int* error) {
Close();
#ifdef WIN32
std::wstring wfilename;
if (Utf8ToWindowsFilename(filename, &wfilename)) {
file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
} else {
if (error) {
*error = -1;
return false;
}
}
#else
file_ = fopen(filename.c_str(), mode);
#endif
if (!file_ && error) {
*error = errno;
}
return (file_ != NULL);
}
bool FileStream::OpenShare(const std::string& filename, const char* mode,
int shflag, int* error) {
Close();
#ifdef WIN32
std::wstring wfilename;
if (Utf8ToWindowsFilename(filename, &wfilename)) {
file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
if (!file_ && error) {
*error = errno;
return false;
}
return file_ != NULL;
} else {
if (error) {
*error = -1;
}
return false;
}
#else
return Open(filename, mode, error);
#endif
}
bool FileStream::DisableBuffering() {
if (!file_)
return false;
return (setvbuf(file_, NULL, _IONBF, 0) == 0);
}
StreamState FileStream::GetState() const {
return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
}
StreamResult FileStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
if (!file_)
return SR_EOS;
size_t result = fread(buffer, 1, buffer_len, file_);
if ((result == 0) && (buffer_len > 0)) {
if (feof(file_))
return SR_EOS;
if (error)
*error = errno;
return SR_ERROR;
}
if (read)
*read = result;
return SR_SUCCESS;
}
StreamResult FileStream::Write(const void* data, size_t data_len,
size_t* written, int* error) {
if (!file_)
return SR_EOS;
size_t result = fwrite(data, 1, data_len, file_);
if ((result == 0) && (data_len > 0)) {
if (error)
*error = errno;
return SR_ERROR;
}
if (written)
*written = result;
return SR_SUCCESS;
}
void FileStream::Close() {
if (file_) {
DoClose();
file_ = NULL;
}
}
bool FileStream::SetPosition(size_t position) {
if (!file_)
return false;
return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
}
bool FileStream::GetPosition(size_t* position) const {
ASSERT(NULL != position);
if (!file_)
return false;
long result = ftell(file_);
if (result < 0)
return false;
if (position)
*position = result;
return true;
}
bool FileStream::GetSize(size_t* size) const {
ASSERT(NULL != size);
if (!file_)
return false;
struct stat file_stats;
if (fstat(fileno(file_), &file_stats) != 0)
return false;
if (size)
*size = file_stats.st_size;
return true;
}
bool FileStream::GetAvailable(size_t* size) const {
ASSERT(NULL != size);
if (!GetSize(size))
return false;
long result = ftell(file_);
if (result < 0)
return false;
if (size)
*size -= result;
return true;
}
bool FileStream::ReserveSize(size_t size) {
// TODO: extend the file to the proper length
return true;
}
bool FileStream::GetSize(const std::string& filename, size_t* size) {
struct stat file_stats;
if (stat(filename.c_str(), &file_stats) != 0)
return false;
*size = file_stats.st_size;
return true;
}
bool FileStream::Flush() {
if (file_) {
return (0 == fflush(file_));
}
// try to flush empty file?
ASSERT(false);
return false;
}
#if defined(POSIX)
bool FileStream::TryLock() {
if (file_ == NULL) {
// Stream not open.
ASSERT(false);
return false;
}
return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
}
bool FileStream::Unlock() {
if (file_ == NULL) {
// Stream not open.
ASSERT(false);
return false;
}
return flock(fileno(file_), LOCK_UN) == 0;
}
#endif
void FileStream::DoClose() {
fclose(file_);
}
CircularFileStream::CircularFileStream(size_t max_size)
: max_write_size_(max_size),
position_(0),
marked_position_(max_size / 2),
last_write_position_(0),
read_segment_(READ_LATEST),
read_segment_available_(0) {
}
bool CircularFileStream::Open(
const std::string& filename, const char* mode, int* error) {
if (!FileStream::Open(filename.c_str(), mode, error))
return false;
if (strchr(mode, "r") != NULL) { // Opened in read mode.
// Check if the buffer has been overwritten and determine how to read the
// log in time sequence.
size_t file_size;
GetSize(&file_size);
if (file_size == position_) {
// The buffer has not been overwritten yet. Read 0 .. file_size
read_segment_ = READ_LATEST;
read_segment_available_ = file_size;
} else {
// The buffer has been over written. There are three segments: The first
// one is 0 .. marked_position_, which is the marked earliest log. The
// second one is position_ .. file_size, which is the middle log. The
// last one is marked_position_ .. position_, which is the latest log.
read_segment_ = READ_MARKED;
read_segment_available_ = marked_position_;
last_write_position_ = position_;
}
// Read from the beginning.
position_ = 0;
SetPosition(position_);
}
return true;
}
StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
if (read_segment_available_ == 0) {
size_t file_size;
switch (read_segment_) {
case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
read_segment_ = READ_MIDDLE;
position_ = last_write_position_;
SetPosition(position_);
GetSize(&file_size);
read_segment_available_ = file_size - position_;
break;
case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
read_segment_ = READ_LATEST;
position_ = marked_position_;
SetPosition(position_);
read_segment_available_ = last_write_position_ - position_;
break;
default: // Finished READ_LATEST and return EOS.
return talk_base::SR_EOS;
}
}
size_t local_read;
if (!read) read = &local_read;
size_t to_read = talk_base::_min(buffer_len, read_segment_available_);
talk_base::StreamResult result
= talk_base::FileStream::Read(buffer, to_read, read, error);
if (result == talk_base::SR_SUCCESS) {
read_segment_available_ -= *read;
position_ += *read;
}
return result;
}
StreamResult CircularFileStream::Write(const void* data, size_t data_len,
size_t* written, int* error) {
if (position_ >= max_write_size_) {
ASSERT(position_ == max_write_size_);
position_ = marked_position_;
SetPosition(position_);
}
size_t local_written;
if (!written) written = &local_written;
size_t to_eof = max_write_size_ - position_;
size_t to_write = talk_base::_min(data_len, to_eof);
talk_base::StreamResult result
= talk_base::FileStream::Write(data, to_write, written, error);
if (result == talk_base::SR_SUCCESS) {
position_ += *written;
}
return result;
}
AsyncWriteStream::~AsyncWriteStream() {
write_thread_->Clear(this, 0, NULL);
ClearBufferAndWrite();
CritScope cs(&crit_stream_);
stream_.reset();
}
// This is needed by some stream writers, such as RtpDumpWriter.
bool AsyncWriteStream::GetPosition(size_t* position) const {
CritScope cs(&crit_stream_);
return stream_->GetPosition(position);
}
// This is needed by some stream writers, such as the plugin log writers.
StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
CritScope cs(&crit_stream_);
return stream_->Read(buffer, buffer_len, read, error);
}
void AsyncWriteStream::Close() {
if (state_ == SS_CLOSED) {
return;
}
write_thread_->Clear(this, 0, NULL);
ClearBufferAndWrite();
CritScope cs(&crit_stream_);
stream_->Close();
state_ = SS_CLOSED;
}
StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
size_t* written, int* error) {
if (state_ == SS_CLOSED) {
return SR_ERROR;
}
size_t previous_buffer_length = 0;
{
CritScope cs(&crit_buffer_);
previous_buffer_length = buffer_.length();
buffer_.AppendData(data, data_len);
}
if (previous_buffer_length == 0) {
// If there's stuff already in the buffer, then we already called
// Post and the write_thread_ hasn't pulled it out yet, so we
// don't need to re-Post.
write_thread_->Post(this, 0, NULL);
}
// Return immediately, assuming that it works.
if (written) {
*written = data_len;
}
return SR_SUCCESS;
}
void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
ClearBufferAndWrite();
}
bool AsyncWriteStream::Flush() {
if (state_ == SS_CLOSED) {
return false;
}
ClearBufferAndWrite();
CritScope cs(&crit_stream_);
return stream_->Flush();
}
void AsyncWriteStream::ClearBufferAndWrite() {
Buffer to_write;
{
CritScope cs_buffer(&crit_buffer_);
buffer_.TransferTo(&to_write);
}
if (to_write.length() > 0) {
CritScope cs(&crit_stream_);
stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
}
}
#ifdef POSIX
// Have to identically rewrite the FileStream destructor or else it would call
// the base class's Close() instead of the sub-class's.
POpenStream::~POpenStream() {
POpenStream::Close();
}
bool POpenStream::Open(const std::string& subcommand,
const char* mode,
int* error) {
Close();
file_ = popen(subcommand.c_str(), mode);
if (file_ == NULL) {
if (error)
*error = errno;
return false;
}
return true;
}
bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
int shflag, int* error) {
return Open(subcommand, mode, error);
}
void POpenStream::DoClose() {
wait_status_ = pclose(file_);
}
#endif
///////////////////////////////////////////////////////////////////////////////
// MemoryStream
///////////////////////////////////////////////////////////////////////////////
MemoryStreamBase::MemoryStreamBase()
: buffer_(NULL), buffer_length_(0), data_length_(0),
seek_position_(0) {
}
StreamState MemoryStreamBase::GetState() const {
return SS_OPEN;
}
StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
size_t* bytes_read, int* error) {
if (seek_position_ >= data_length_) {
return SR_EOS;
}
size_t available = data_length_ - seek_position_;
if (bytes > available) {
// Read partial buffer
bytes = available;
}
memcpy(buffer, &buffer_[seek_position_], bytes);
seek_position_ += bytes;
if (bytes_read) {
*bytes_read = bytes;
}
return SR_SUCCESS;
}
StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
size_t* bytes_written, int* error) {
size_t available = buffer_length_ - seek_position_;
if (0 == available) {
// Increase buffer size to the larger of:
// a) new position rounded up to next 256 bytes
// b) double the previous length
size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
buffer_length_ * 2);
StreamResult result = DoReserve(new_buffer_length, error);
if (SR_SUCCESS != result) {
return result;
}
ASSERT(buffer_length_ >= new_buffer_length);
available = buffer_length_ - seek_position_;
}
if (bytes > available) {
bytes = available;
}
memcpy(&buffer_[seek_position_], buffer, bytes);
seek_position_ += bytes;
if (data_length_ < seek_position_) {
data_length_ = seek_position_;
}
if (bytes_written) {
*bytes_written = bytes;
}
return SR_SUCCESS;
}
void MemoryStreamBase::Close() {
// nothing to do
}
bool MemoryStreamBase::SetPosition(size_t position) {
if (position > data_length_)
return false;
seek_position_ = position;
return true;
}
bool MemoryStreamBase::GetPosition(size_t* position) const {
if (position)
*position = seek_position_;
return true;
}
bool MemoryStreamBase::GetSize(size_t* size) const {
if (size)
*size = data_length_;
return true;
}
bool MemoryStreamBase::GetAvailable(size_t* size) const {
if (size)
*size = data_length_ - seek_position_;
return true;
}
bool MemoryStreamBase::ReserveSize(size_t size) {
return (SR_SUCCESS == DoReserve(size, NULL));
}
StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
}
///////////////////////////////////////////////////////////////////////////////
MemoryStream::MemoryStream()
: buffer_alloc_(NULL) {
}
MemoryStream::MemoryStream(const char* data)
: buffer_alloc_(NULL) {
SetData(data, strlen(data));
}
MemoryStream::MemoryStream(const void* data, size_t length)
: buffer_alloc_(NULL) {
SetData(data, length);
}
MemoryStream::~MemoryStream() {
delete [] buffer_alloc_;
}
void MemoryStream::SetData(const void* data, size_t length) {
data_length_ = buffer_length_ = length;
delete [] buffer_alloc_;
buffer_alloc_ = new char[buffer_length_ + kAlignment];
buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
memcpy(buffer_, data, data_length_);
seek_position_ = 0;
}
StreamResult MemoryStream::DoReserve(size_t size, int* error) {
if (buffer_length_ >= size)
return SR_SUCCESS;
if (char* new_buffer_alloc = new char[size + kAlignment]) {
char* new_buffer = reinterpret_cast<char*>(
ALIGNP(new_buffer_alloc, kAlignment));
memcpy(new_buffer, buffer_, data_length_);
delete [] buffer_alloc_;
buffer_alloc_ = new_buffer_alloc;
buffer_ = new_buffer;
buffer_length_ = size;
return SR_SUCCESS;
}
if (error) {
*error = ENOMEM;
}
return SR_ERROR;
}
///////////////////////////////////////////////////////////////////////////////
ExternalMemoryStream::ExternalMemoryStream() {
}
ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
SetData(data, length);
}
ExternalMemoryStream::~ExternalMemoryStream() {
}
void ExternalMemoryStream::SetData(void* data, size_t length) {
data_length_ = buffer_length_ = length;
buffer_ = static_cast<char*>(data);
seek_position_ = 0;
}
///////////////////////////////////////////////////////////////////////////////
// FifoBuffer
///////////////////////////////////////////////////////////////////////////////
FifoBuffer::FifoBuffer(size_t size)
: state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
data_length_(0), read_position_(0), owner_(Thread::Current()) {
// all events are done on the owner_ thread
}
FifoBuffer::FifoBuffer(size_t size, Thread* owner)
: state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
data_length_(0), read_position_(0), owner_(owner) {
// all events are done on the owner_ thread
}
FifoBuffer::~FifoBuffer() {
}
bool FifoBuffer::GetBuffered(size_t* size) const {
CritScope cs(&crit_);
*size = data_length_;
return true;
}
bool FifoBuffer::SetCapacity(size_t size) {
CritScope cs(&crit_);
if (data_length_ > size) {
return false;
}
if (size != buffer_length_) {
char* buffer = new char[size];
const size_t copy = data_length_;
const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
memcpy(buffer, &buffer_[read_position_], tail_copy);
memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
buffer_.reset(buffer);
read_position_ = 0;
buffer_length_ = size;
}
return true;
}
StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
size_t offset, size_t* bytes_read) {
CritScope cs(&crit_);
return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
}
StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
size_t offset, size_t* bytes_written) {
CritScope cs(&crit_);
return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
}
StreamState FifoBuffer::GetState() const {
return state_;
}
StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
size_t* bytes_read, int* error) {
CritScope cs(&crit_);
const bool was_writable = data_length_ < buffer_length_;
size_t copy = 0;
StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
if (result == SR_SUCCESS) {
// If read was successful then adjust the read position and number of
// bytes buffered.
read_position_ = (read_position_ + copy) % buffer_length_;
data_length_ -= copy;
if (bytes_read) {
*bytes_read = copy;
}
// if we were full before, and now we're not, post an event
if (!was_writable && copy > 0) {
PostEvent(owner_, SE_WRITE, 0);
}
}
return result;
}
StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
size_t* bytes_written, int* error) {
CritScope cs(&crit_);
const bool was_readable = (data_length_ > 0);
size_t copy = 0;
StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
if (result == SR_SUCCESS) {
// If write was successful then adjust the number of readable bytes.
data_length_ += copy;
if (bytes_written) {
*bytes_written = copy;
}
// if we didn't have any data to read before, and now we do, post an event
if (!was_readable && copy > 0) {
PostEvent(owner_, SE_READ, 0);
}
}
return result;
}
void FifoBuffer::Close() {
CritScope cs(&crit_);
state_ = SS_CLOSED;
}
const void* FifoBuffer::GetReadData(size_t* size) {
CritScope cs(&crit_);
*size = (read_position_ + data_length_ <= buffer_length_) ?
data_length_ : buffer_length_ - read_position_;
return &buffer_[read_position_];
}
void FifoBuffer::ConsumeReadData(size_t size) {
CritScope cs(&crit_);
ASSERT(size <= data_length_);
const bool was_writable = data_length_ < buffer_length_;
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
if (!was_writable && size > 0) {
PostEvent(owner_, SE_WRITE, 0);
}
}
void* FifoBuffer::GetWriteBuffer(size_t* size) {
CritScope cs(&crit_);
if (state_ == SS_CLOSED) {
return NULL;
}
// if empty, reset the write position to the beginning, so we can get
// the biggest possible block
if (data_length_ == 0) {
read_position_ = 0;
}
const size_t write_position = (read_position_ + data_length_)
% buffer_length_;
*size = (write_position > read_position_ || data_length_ == 0) ?
buffer_length_ - write_position : read_position_ - write_position;
return &buffer_[write_position];
}
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
CritScope cs(&crit_);
ASSERT(size <= buffer_length_ - data_length_);
const bool was_readable = (data_length_ > 0);
data_length_ += size;
if (!was_readable && size > 0) {
PostEvent(owner_, SE_READ, 0);
}
}
bool FifoBuffer::GetWriteRemaining(size_t* size) const {
CritScope cs(&crit_);
*size = buffer_length_ - data_length_;
return true;
}
StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read) {
if (offset >= data_length_) {
return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
}
const size_t available = data_length_ - offset;
const size_t read_position = (read_position_ + offset) % buffer_length_;
const size_t copy = _min(bytes, available);
const size_t tail_copy = _min(copy, buffer_length_ - read_position);
char* const p = static_cast<char*>(buffer);
memcpy(p, &buffer_[read_position], tail_copy);
memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
if (bytes_read) {
*bytes_read = copy;
}
return SR_SUCCESS;
}
StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written) {
if (state_ == SS_CLOSED) {
return SR_EOS;
}
if (data_length_ + offset >= buffer_length_) {
return SR_BLOCK;
}
const size_t available = buffer_length_ - data_length_ - offset;
const size_t write_position = (read_position_ + data_length_ + offset)
% buffer_length_;
const size_t copy = _min(bytes, available);
const size_t tail_copy = _min(copy, buffer_length_ - write_position);
const char* const p = static_cast<const char*>(buffer);
memcpy(&buffer_[write_position], p, tail_copy);
memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
if (bytes_written) {
*bytes_written = copy;
}
return SR_SUCCESS;
}
///////////////////////////////////////////////////////////////////////////////
// LoggingAdapter
///////////////////////////////////////////////////////////////////////////////
LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
const std::string& label, bool hex_mode)
: StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
set_label(label);
}
void LoggingAdapter::set_label(const std::string& label) {
label_.assign("[");
label_.append(label);
label_.append("]");
}
StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
size_t local_read; if (!read) read = &local_read;
StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
error);
if (result == SR_SUCCESS) {
LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
}
return result;
}
StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
size_t* written, int* error) {
size_t local_written;
if (!written) written = &local_written;
StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
error);
if (result == SR_SUCCESS) {
LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
&lms_);
}
return result;
}
void LoggingAdapter::Close() {
LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
LOG_V(level_) << label_ << " Closed locally";
StreamAdapterInterface::Close();
}
void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
if (events & SE_OPEN) {
LOG_V(level_) << label_ << " Open";
} else if (events & SE_CLOSE) {
LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
LOG_V(level_) << label_ << " Closed with error: " << err;
}
StreamAdapterInterface::OnEvent(stream, events, err);
}
///////////////////////////////////////////////////////////////////////////////
// StringStream - Reads/Writes to an external std::string
///////////////////////////////////////////////////////////////////////////////
StringStream::StringStream(std::string& str)
: str_(str), read_pos_(0), read_only_(false) {
}
StringStream::StringStream(const std::string& str)
: str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
}
StreamState StringStream::GetState() const {
return SS_OPEN;
}
StreamResult StringStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
size_t available = _min(buffer_len, str_.size() - read_pos_);
if (!available)
return SR_EOS;
memcpy(buffer, str_.data() + read_pos_, available);
read_pos_ += available;
if (read)
*read = available;
return SR_SUCCESS;
}
StreamResult StringStream::Write(const void* data, size_t data_len,
size_t* written, int* error) {
if (read_only_) {
if (error) {
*error = -1;
}
return SR_ERROR;
}
str_.append(static_cast<const char*>(data),
static_cast<const char*>(data) + data_len);
if (written)
*written = data_len;
return SR_SUCCESS;
}
void StringStream::Close() {
}
bool StringStream::SetPosition(size_t position) {
if (position > str_.size())
return false;
read_pos_ = position;
return true;
}
bool StringStream::GetPosition(size_t* position) const {
if (position)
*position = read_pos_;
return true;
}
bool StringStream::GetSize(size_t* size) const {
if (size)
*size = str_.size();
return true;
}
bool StringStream::GetAvailable(size_t* size) const {
if (size)
*size = str_.size() - read_pos_;
return true;
}
bool StringStream::ReserveSize(size_t size) {
if (read_only_)
return false;
str_.reserve(size);
return true;
}
///////////////////////////////////////////////////////////////////////////////
// StreamReference
///////////////////////////////////////////////////////////////////////////////
StreamReference::StreamReference(StreamInterface* stream)
: StreamAdapterInterface(stream, false) {
// owner set to false so the destructor does not free the stream.
stream_ref_count_ = new StreamRefCount(stream);
}
StreamInterface* StreamReference::NewReference() {
stream_ref_count_->AddReference();
return new StreamReference(stream_ref_count_, stream());
}
StreamReference::~StreamReference() {
stream_ref_count_->Release();
}
StreamReference::StreamReference(StreamRefCount* stream_ref_count,
StreamInterface* stream)
: StreamAdapterInterface(stream, false),
stream_ref_count_(stream_ref_count) {
}
///////////////////////////////////////////////////////////////////////////////
StreamResult Flow(StreamInterface* source,
char* buffer, size_t buffer_len,
StreamInterface* sink,
size_t* data_len /* = NULL */) {
ASSERT(buffer_len > 0);
StreamResult result;
size_t count, read_pos, write_pos;
if (data_len) {
read_pos = *data_len;
} else {
read_pos = 0;
}
bool end_of_stream = false;
do {
// Read until buffer is full, end of stream, or error
while (!end_of_stream && (read_pos < buffer_len)) {
result = source->Read(buffer + read_pos, buffer_len - read_pos,
&count, NULL);
if (result == SR_EOS) {
end_of_stream = true;
} else if (result != SR_SUCCESS) {
if (data_len) {
*data_len = read_pos;
}
return result;
} else {
read_pos += count;
}
}
// Write until buffer is empty, or error (including end of stream)
write_pos = 0;
while (write_pos < read_pos) {
result = sink->Write(buffer + write_pos, read_pos - write_pos,
&count, NULL);
if (result != SR_SUCCESS) {
if (data_len) {
*data_len = read_pos - write_pos;
if (write_pos > 0) {
memmove(buffer, buffer + write_pos, *data_len);
}
}
return result;
}
write_pos += count;
}
read_pos = 0;
} while (!end_of_stream);
if (data_len) {
*data_len = 0;
}
return SR_SUCCESS;
}
///////////////////////////////////////////////////////////////////////////////
} // namespace talk_base