R=mallinath@webrtc.org Review URL: https://webrtc-codereview.appspot.com/2366004 git-svn-id: http://webrtc.googlecode.com/svn/trunk@4941 4adac7df-926f-26a2-2b94-8c16560cd09d
		
			
				
	
	
		
			1353 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1353 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * libjingle
 | 
						|
 * Copyright 2004 Google Inc.
 | 
						|
 *
 | 
						|
 * Redistribution and use in source and binary forms, with or without
 | 
						|
 * modification, are permitted provided that the following conditions are met:
 | 
						|
 *
 | 
						|
 *  1. Redistributions of source code must retain the above copyright notice,
 | 
						|
 *     this list of conditions and the following disclaimer.
 | 
						|
 *  2. Redistributions in binary form must reproduce the above copyright notice,
 | 
						|
 *     this list of conditions and the following disclaimer in the documentation
 | 
						|
 *     and/or other materials provided with the distribution.
 | 
						|
 *  3. The name of the author may not be used to endorse or promote products
 | 
						|
 *     derived from this software without specific prior written permission.
 | 
						|
 *
 | 
						|
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
 | 
						|
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 | 
						|
 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
 | 
						|
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | 
						|
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 | 
						|
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 | 
						|
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 | 
						|
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 | 
						|
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 | 
						|
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
						|
 */
 | 
						|
 | 
						|
#if defined(POSIX)
 | 
						|
#include <sys/file.h>
 | 
						|
#endif  // POSIX
 | 
						|
#include <sys/types.h>
 | 
						|
#include <sys/stat.h>
 | 
						|
#include <errno.h>
 | 
						|
#include <string>
 | 
						|
#include "talk/base/basictypes.h"
 | 
						|
#include "talk/base/common.h"
 | 
						|
#include "talk/base/logging.h"
 | 
						|
#include "talk/base/messagequeue.h"
 | 
						|
#include "talk/base/stream.h"
 | 
						|
#include "talk/base/stringencode.h"
 | 
						|
#include "talk/base/stringutils.h"
 | 
						|
#include "talk/base/thread.h"
 | 
						|
#include "talk/base/timeutils.h"
 | 
						|
 | 
						|
#ifdef WIN32
 | 
						|
#include "talk/base/win32.h"
 | 
						|
#define fileno _fileno
 | 
						|
#endif
 | 
						|
 | 
						|
namespace talk_base {
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// StreamInterface
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
StreamInterface::~StreamInterface() {
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
 | 
						|
                                       size_t* written, int* error) {
 | 
						|
  StreamResult result = SR_SUCCESS;
 | 
						|
  size_t total_written = 0, current_written;
 | 
						|
  while (total_written < data_len) {
 | 
						|
    result = Write(static_cast<const char*>(data) + total_written,
 | 
						|
                   data_len - total_written, ¤t_written, error);
 | 
						|
    if (result != SR_SUCCESS)
 | 
						|
      break;
 | 
						|
    total_written += current_written;
 | 
						|
  }
 | 
						|
  if (written)
 | 
						|
    *written = total_written;
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
 | 
						|
                                      size_t* read, int* error) {
 | 
						|
  StreamResult result = SR_SUCCESS;
 | 
						|
  size_t total_read = 0, current_read;
 | 
						|
  while (total_read < buffer_len) {
 | 
						|
    result = Read(static_cast<char*>(buffer) + total_read,
 | 
						|
                  buffer_len - total_read, ¤t_read, error);
 | 
						|
    if (result != SR_SUCCESS)
 | 
						|
      break;
 | 
						|
    total_read += current_read;
 | 
						|
  }
 | 
						|
  if (read)
 | 
						|
    *read = total_read;
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StreamInterface::ReadLine(std::string* line) {
 | 
						|
  line->clear();
 | 
						|
  StreamResult result = SR_SUCCESS;
 | 
						|
  while (true) {
 | 
						|
    char ch;
 | 
						|
    result = Read(&ch, sizeof(ch), NULL, NULL);
 | 
						|
    if (result != SR_SUCCESS) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    if (ch == '\n') {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    line->push_back(ch);
 | 
						|
  }
 | 
						|
  if (!line->empty()) {   // give back the line we've collected so far with
 | 
						|
    result = SR_SUCCESS;  // a success code.  Otherwise return the last code
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
void StreamInterface::PostEvent(Thread* t, int events, int err) {
 | 
						|
  t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
 | 
						|
}
 | 
						|
 | 
						|
void StreamInterface::PostEvent(int events, int err) {
 | 
						|
  PostEvent(Thread::Current(), events, err);
 | 
						|
}
 | 
						|
 | 
						|
StreamInterface::StreamInterface() {
 | 
						|
}
 | 
						|
 | 
						|
void StreamInterface::OnMessage(Message* msg) {
 | 
						|
  if (MSG_POST_EVENT == msg->message_id) {
 | 
						|
    StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
 | 
						|
    SignalEvent(this, pe->events, pe->error);
 | 
						|
    delete msg->pdata;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// StreamAdapterInterface
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
 | 
						|
                                               bool owned)
 | 
						|
    : stream_(stream), owned_(owned) {
 | 
						|
  if (NULL != stream_)
 | 
						|
    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
 | 
						|
}
 | 
						|
 | 
						|
void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
 | 
						|
  if (NULL != stream_)
 | 
						|
    stream_->SignalEvent.disconnect(this);
 | 
						|
  if (owned_)
 | 
						|
    delete stream_;
 | 
						|
  stream_ = stream;
 | 
						|
  owned_ = owned;
 | 
						|
  if (NULL != stream_)
 | 
						|
    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
 | 
						|
}
 | 
						|
 | 
						|
StreamInterface* StreamAdapterInterface::Detach() {
 | 
						|
  if (NULL != stream_)
 | 
						|
    stream_->SignalEvent.disconnect(this);
 | 
						|
  StreamInterface* stream = stream_;
 | 
						|
  stream_ = NULL;
 | 
						|
  return stream;
 | 
						|
}
 | 
						|
 | 
						|
StreamAdapterInterface::~StreamAdapterInterface() {
 | 
						|
  if (owned_)
 | 
						|
    delete stream_;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// StreamTap
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
 | 
						|
    : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
 | 
						|
        tap_error_(0) {
 | 
						|
  AttachTap(tap);
 | 
						|
}
 | 
						|
 | 
						|
void StreamTap::AttachTap(StreamInterface* tap) {
 | 
						|
  tap_.reset(tap);
 | 
						|
}
 | 
						|
 | 
						|
StreamInterface* StreamTap::DetachTap() {
 | 
						|
  return tap_.release();
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StreamTap::GetTapResult(int* error) {
 | 
						|
  if (error) {
 | 
						|
    *error = tap_error_;
 | 
						|
  }
 | 
						|
  return tap_result_;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
 | 
						|
                             size_t* read, int* error) {
 | 
						|
  size_t backup_read;
 | 
						|
  if (!read) {
 | 
						|
    read = &backup_read;
 | 
						|
  }
 | 
						|
  StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
 | 
						|
                                                  read, error);
 | 
						|
  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
 | 
						|
    tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
 | 
						|
  }
 | 
						|
  return res;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StreamTap::Write(const void* data, size_t data_len,
 | 
						|
                              size_t* written, int* error) {
 | 
						|
  size_t backup_written;
 | 
						|
  if (!written) {
 | 
						|
    written = &backup_written;
 | 
						|
  }
 | 
						|
  StreamResult res = StreamAdapterInterface::Write(data, data_len,
 | 
						|
                                                   written, error);
 | 
						|
  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
 | 
						|
    tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
 | 
						|
  }
 | 
						|
  return res;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// StreamSegment
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
StreamSegment::StreamSegment(StreamInterface* stream)
 | 
						|
    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
 | 
						|
    length_(SIZE_UNKNOWN) {
 | 
						|
  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
 | 
						|
  stream->GetPosition(&start_);
 | 
						|
}
 | 
						|
 | 
						|
StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
 | 
						|
    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
 | 
						|
    length_(length) {
 | 
						|
  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
 | 
						|
  stream->GetPosition(&start_);
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
 | 
						|
                                 size_t* read, int* error) {
 | 
						|
  if (SIZE_UNKNOWN != length_) {
 | 
						|
    if (pos_ >= length_)
 | 
						|
      return SR_EOS;
 | 
						|
    buffer_len = _min(buffer_len, length_ - pos_);
 | 
						|
  }
 | 
						|
  size_t backup_read;
 | 
						|
  if (!read) {
 | 
						|
    read = &backup_read;
 | 
						|
  }
 | 
						|
  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
 | 
						|
                                                     read, error);
 | 
						|
  if (SR_SUCCESS == result) {
 | 
						|
    pos_ += *read;
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
bool StreamSegment::SetPosition(size_t position) {
 | 
						|
  if (SIZE_UNKNOWN == start_)
 | 
						|
    return false;  // Not seekable
 | 
						|
  if ((SIZE_UNKNOWN != length_) && (position > length_))
 | 
						|
    return false;  // Seek past end of segment
 | 
						|
  if (!StreamAdapterInterface::SetPosition(start_ + position))
 | 
						|
    return false;
 | 
						|
  pos_ = position;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StreamSegment::GetPosition(size_t* position) const {
 | 
						|
  if (SIZE_UNKNOWN == start_)
 | 
						|
    return false;  // Not seekable
 | 
						|
  if (!StreamAdapterInterface::GetPosition(position))
 | 
						|
    return false;
 | 
						|
  if (position) {
 | 
						|
    ASSERT(*position >= start_);
 | 
						|
    *position -= start_;
 | 
						|
  }
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StreamSegment::GetSize(size_t* size) const {
 | 
						|
  if (!StreamAdapterInterface::GetSize(size))
 | 
						|
    return false;
 | 
						|
  if (size) {
 | 
						|
    if (SIZE_UNKNOWN != start_) {
 | 
						|
      ASSERT(*size >= start_);
 | 
						|
      *size -= start_;
 | 
						|
    }
 | 
						|
    if (SIZE_UNKNOWN != length_) {
 | 
						|
      *size = _min(*size, length_);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StreamSegment::GetAvailable(size_t* size) const {
 | 
						|
  if (!StreamAdapterInterface::GetAvailable(size))
 | 
						|
    return false;
 | 
						|
  if (size && (SIZE_UNKNOWN != length_))
 | 
						|
    *size = _min(*size, length_ - pos_);
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// NullStream
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
NullStream::NullStream() {
 | 
						|
}
 | 
						|
 | 
						|
NullStream::~NullStream() {
 | 
						|
}
 | 
						|
 | 
						|
StreamState NullStream::GetState() const {
 | 
						|
  return SS_OPEN;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult NullStream::Read(void* buffer, size_t buffer_len,
 | 
						|
                              size_t* read, int* error) {
 | 
						|
  if (error) *error = -1;
 | 
						|
  return SR_ERROR;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult NullStream::Write(const void* data, size_t data_len,
 | 
						|
                               size_t* written, int* error) {
 | 
						|
  if (written) *written = data_len;
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
void NullStream::Close() {
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// FileStream
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
FileStream::FileStream() : file_(NULL) {
 | 
						|
}
 | 
						|
 | 
						|
FileStream::~FileStream() {
 | 
						|
  FileStream::Close();
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::Open(const std::string& filename, const char* mode,
 | 
						|
                      int* error) {
 | 
						|
  Close();
 | 
						|
#ifdef WIN32
 | 
						|
  std::wstring wfilename;
 | 
						|
  if (Utf8ToWindowsFilename(filename, &wfilename)) {
 | 
						|
    file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
 | 
						|
  } else {
 | 
						|
    if (error) {
 | 
						|
      *error = -1;
 | 
						|
      return false;
 | 
						|
    }
 | 
						|
  }
 | 
						|
#else
 | 
						|
  file_ = fopen(filename.c_str(), mode);
 | 
						|
#endif
 | 
						|
  if (!file_ && error) {
 | 
						|
    *error = errno;
 | 
						|
  }
 | 
						|
  return (file_ != NULL);
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::OpenShare(const std::string& filename, const char* mode,
 | 
						|
                           int shflag, int* error) {
 | 
						|
  Close();
 | 
						|
#ifdef WIN32
 | 
						|
  std::wstring wfilename;
 | 
						|
  if (Utf8ToWindowsFilename(filename, &wfilename)) {
 | 
						|
    file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
 | 
						|
    if (!file_ && error) {
 | 
						|
      *error = errno;
 | 
						|
      return false;
 | 
						|
    }
 | 
						|
    return file_ != NULL;
 | 
						|
  } else {
 | 
						|
    if (error) {
 | 
						|
      *error = -1;
 | 
						|
    }
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
#else
 | 
						|
  return Open(filename, mode, error);
 | 
						|
#endif
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::DisableBuffering() {
 | 
						|
  if (!file_)
 | 
						|
    return false;
 | 
						|
  return (setvbuf(file_, NULL, _IONBF, 0) == 0);
 | 
						|
}
 | 
						|
 | 
						|
StreamState FileStream::GetState() const {
 | 
						|
  return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FileStream::Read(void* buffer, size_t buffer_len,
 | 
						|
                              size_t* read, int* error) {
 | 
						|
  if (!file_)
 | 
						|
    return SR_EOS;
 | 
						|
  size_t result = fread(buffer, 1, buffer_len, file_);
 | 
						|
  if ((result == 0) && (buffer_len > 0)) {
 | 
						|
    if (feof(file_))
 | 
						|
      return SR_EOS;
 | 
						|
    if (error)
 | 
						|
      *error = errno;
 | 
						|
    return SR_ERROR;
 | 
						|
  }
 | 
						|
  if (read)
 | 
						|
    *read = result;
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FileStream::Write(const void* data, size_t data_len,
 | 
						|
                               size_t* written, int* error) {
 | 
						|
  if (!file_)
 | 
						|
    return SR_EOS;
 | 
						|
  size_t result = fwrite(data, 1, data_len, file_);
 | 
						|
  if ((result == 0) && (data_len > 0)) {
 | 
						|
    if (error)
 | 
						|
      *error = errno;
 | 
						|
    return SR_ERROR;
 | 
						|
  }
 | 
						|
  if (written)
 | 
						|
    *written = result;
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
void FileStream::Close() {
 | 
						|
  if (file_) {
 | 
						|
    DoClose();
 | 
						|
    file_ = NULL;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::SetPosition(size_t position) {
 | 
						|
  if (!file_)
 | 
						|
    return false;
 | 
						|
  return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::GetPosition(size_t* position) const {
 | 
						|
  ASSERT(NULL != position);
 | 
						|
  if (!file_)
 | 
						|
    return false;
 | 
						|
  long result = ftell(file_);
 | 
						|
  if (result < 0)
 | 
						|
    return false;
 | 
						|
  if (position)
 | 
						|
    *position = result;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::GetSize(size_t* size) const {
 | 
						|
  ASSERT(NULL != size);
 | 
						|
  if (!file_)
 | 
						|
    return false;
 | 
						|
  struct stat file_stats;
 | 
						|
  if (fstat(fileno(file_), &file_stats) != 0)
 | 
						|
    return false;
 | 
						|
  if (size)
 | 
						|
    *size = file_stats.st_size;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::GetAvailable(size_t* size) const {
 | 
						|
  ASSERT(NULL != size);
 | 
						|
  if (!GetSize(size))
 | 
						|
    return false;
 | 
						|
  long result = ftell(file_);
 | 
						|
  if (result < 0)
 | 
						|
    return false;
 | 
						|
  if (size)
 | 
						|
    *size -= result;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::ReserveSize(size_t size) {
 | 
						|
  // TODO: extend the file to the proper length
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::GetSize(const std::string& filename, size_t* size) {
 | 
						|
  struct stat file_stats;
 | 
						|
  if (stat(filename.c_str(), &file_stats) != 0)
 | 
						|
    return false;
 | 
						|
  *size = file_stats.st_size;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::Flush() {
 | 
						|
  if (file_) {
 | 
						|
    return (0 == fflush(file_));
 | 
						|
  }
 | 
						|
  // try to flush empty file?
 | 
						|
  ASSERT(false);
 | 
						|
  return false;
 | 
						|
}
 | 
						|
 | 
						|
#if defined(POSIX)
 | 
						|
 | 
						|
bool FileStream::TryLock() {
 | 
						|
  if (file_ == NULL) {
 | 
						|
    // Stream not open.
 | 
						|
    ASSERT(false);
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
  return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
 | 
						|
}
 | 
						|
 | 
						|
bool FileStream::Unlock() {
 | 
						|
  if (file_ == NULL) {
 | 
						|
    // Stream not open.
 | 
						|
    ASSERT(false);
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
  return flock(fileno(file_), LOCK_UN) == 0;
 | 
						|
}
 | 
						|
 | 
						|
#endif
 | 
						|
 | 
						|
void FileStream::DoClose() {
 | 
						|
  fclose(file_);
 | 
						|
}
 | 
						|
 | 
						|
CircularFileStream::CircularFileStream(size_t max_size)
 | 
						|
  : max_write_size_(max_size),
 | 
						|
    position_(0),
 | 
						|
    marked_position_(max_size / 2),
 | 
						|
    last_write_position_(0),
 | 
						|
    read_segment_(READ_LATEST),
 | 
						|
    read_segment_available_(0) {
 | 
						|
}
 | 
						|
 | 
						|
bool CircularFileStream::Open(
 | 
						|
    const std::string& filename, const char* mode, int* error) {
 | 
						|
  if (!FileStream::Open(filename.c_str(), mode, error))
 | 
						|
    return false;
 | 
						|
 | 
						|
  if (strchr(mode, "r") != NULL) {  // Opened in read mode.
 | 
						|
    // Check if the buffer has been overwritten and determine how to read the
 | 
						|
    // log in time sequence.
 | 
						|
    size_t file_size;
 | 
						|
    GetSize(&file_size);
 | 
						|
    if (file_size == position_) {
 | 
						|
      // The buffer has not been overwritten yet. Read 0 .. file_size
 | 
						|
      read_segment_ = READ_LATEST;
 | 
						|
      read_segment_available_ = file_size;
 | 
						|
    } else {
 | 
						|
      // The buffer has been over written. There are three segments: The first
 | 
						|
      // one is 0 .. marked_position_, which is the marked earliest log. The
 | 
						|
      // second one is position_ .. file_size, which is the middle log. The
 | 
						|
      // last one is marked_position_ .. position_, which is the latest log.
 | 
						|
      read_segment_ = READ_MARKED;
 | 
						|
      read_segment_available_ = marked_position_;
 | 
						|
      last_write_position_ = position_;
 | 
						|
    }
 | 
						|
 | 
						|
    // Read from the beginning.
 | 
						|
    position_ = 0;
 | 
						|
    SetPosition(position_);
 | 
						|
  }
 | 
						|
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
 | 
						|
                                      size_t* read, int* error) {
 | 
						|
  if (read_segment_available_ == 0) {
 | 
						|
    size_t file_size;
 | 
						|
    switch (read_segment_) {
 | 
						|
      case READ_MARKED:  // Finished READ_MARKED and start READ_MIDDLE.
 | 
						|
        read_segment_ = READ_MIDDLE;
 | 
						|
        position_ = last_write_position_;
 | 
						|
        SetPosition(position_);
 | 
						|
        GetSize(&file_size);
 | 
						|
        read_segment_available_ = file_size - position_;
 | 
						|
        break;
 | 
						|
 | 
						|
      case READ_MIDDLE:  // Finished READ_MIDDLE and start READ_LATEST.
 | 
						|
        read_segment_ = READ_LATEST;
 | 
						|
        position_ = marked_position_;
 | 
						|
        SetPosition(position_);
 | 
						|
        read_segment_available_ = last_write_position_ - position_;
 | 
						|
        break;
 | 
						|
 | 
						|
      default:  // Finished READ_LATEST and return EOS.
 | 
						|
        return talk_base::SR_EOS;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  size_t local_read;
 | 
						|
  if (!read) read = &local_read;
 | 
						|
 | 
						|
  size_t to_read = talk_base::_min(buffer_len, read_segment_available_);
 | 
						|
  talk_base::StreamResult result
 | 
						|
    = talk_base::FileStream::Read(buffer, to_read, read, error);
 | 
						|
  if (result == talk_base::SR_SUCCESS) {
 | 
						|
    read_segment_available_ -= *read;
 | 
						|
    position_ += *read;
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult CircularFileStream::Write(const void* data, size_t data_len,
 | 
						|
                                       size_t* written, int* error) {
 | 
						|
  if (position_ >= max_write_size_) {
 | 
						|
    ASSERT(position_ == max_write_size_);
 | 
						|
    position_ = marked_position_;
 | 
						|
    SetPosition(position_);
 | 
						|
  }
 | 
						|
 | 
						|
  size_t local_written;
 | 
						|
  if (!written) written = &local_written;
 | 
						|
 | 
						|
  size_t to_eof = max_write_size_ - position_;
 | 
						|
  size_t to_write = talk_base::_min(data_len, to_eof);
 | 
						|
  talk_base::StreamResult result
 | 
						|
    = talk_base::FileStream::Write(data, to_write, written, error);
 | 
						|
  if (result == talk_base::SR_SUCCESS) {
 | 
						|
    position_ += *written;
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
AsyncWriteStream::~AsyncWriteStream() {
 | 
						|
  write_thread_->Clear(this, 0, NULL);
 | 
						|
  ClearBufferAndWrite();
 | 
						|
 | 
						|
  CritScope cs(&crit_stream_);
 | 
						|
  stream_.reset();
 | 
						|
}
 | 
						|
 | 
						|
// This is needed by some stream writers, such as RtpDumpWriter.
 | 
						|
bool AsyncWriteStream::GetPosition(size_t* position) const {
 | 
						|
  CritScope cs(&crit_stream_);
 | 
						|
  return stream_->GetPosition(position);
 | 
						|
}
 | 
						|
 | 
						|
// This is needed by some stream writers, such as the plugin log writers.
 | 
						|
StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
 | 
						|
                                    size_t* read, int* error) {
 | 
						|
  CritScope cs(&crit_stream_);
 | 
						|
  return stream_->Read(buffer, buffer_len, read, error);
 | 
						|
}
 | 
						|
 | 
						|
void AsyncWriteStream::Close() {
 | 
						|
  if (state_ == SS_CLOSED) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  write_thread_->Clear(this, 0, NULL);
 | 
						|
  ClearBufferAndWrite();
 | 
						|
 | 
						|
  CritScope cs(&crit_stream_);
 | 
						|
  stream_->Close();
 | 
						|
  state_ = SS_CLOSED;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
 | 
						|
                                     size_t* written, int* error) {
 | 
						|
  if (state_ == SS_CLOSED) {
 | 
						|
    return SR_ERROR;
 | 
						|
  }
 | 
						|
 | 
						|
  size_t previous_buffer_length = 0;
 | 
						|
  {
 | 
						|
    CritScope cs(&crit_buffer_);
 | 
						|
    previous_buffer_length = buffer_.length();
 | 
						|
    buffer_.AppendData(data, data_len);
 | 
						|
  }
 | 
						|
 | 
						|
  if (previous_buffer_length == 0) {
 | 
						|
    // If there's stuff already in the buffer, then we already called
 | 
						|
    // Post and the write_thread_ hasn't pulled it out yet, so we
 | 
						|
    // don't need to re-Post.
 | 
						|
    write_thread_->Post(this, 0, NULL);
 | 
						|
  }
 | 
						|
  // Return immediately, assuming that it works.
 | 
						|
  if (written) {
 | 
						|
    *written = data_len;
 | 
						|
  }
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
 | 
						|
  ClearBufferAndWrite();
 | 
						|
}
 | 
						|
 | 
						|
bool AsyncWriteStream::Flush() {
 | 
						|
  if (state_ == SS_CLOSED) {
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
  ClearBufferAndWrite();
 | 
						|
 | 
						|
  CritScope cs(&crit_stream_);
 | 
						|
  return stream_->Flush();
 | 
						|
}
 | 
						|
 | 
						|
void AsyncWriteStream::ClearBufferAndWrite() {
 | 
						|
  Buffer to_write;
 | 
						|
  {
 | 
						|
    CritScope cs_buffer(&crit_buffer_);
 | 
						|
    buffer_.TransferTo(&to_write);
 | 
						|
  }
 | 
						|
 | 
						|
  if (to_write.length() > 0) {
 | 
						|
    CritScope cs(&crit_stream_);
 | 
						|
    stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
#ifdef POSIX
 | 
						|
 | 
						|
// Have to identically rewrite the FileStream destructor or else it would call
 | 
						|
// the base class's Close() instead of the sub-class's.
 | 
						|
POpenStream::~POpenStream() {
 | 
						|
  POpenStream::Close();
 | 
						|
}
 | 
						|
 | 
						|
bool POpenStream::Open(const std::string& subcommand,
 | 
						|
                       const char* mode,
 | 
						|
                       int* error) {
 | 
						|
  Close();
 | 
						|
  file_ = popen(subcommand.c_str(), mode);
 | 
						|
  if (file_ == NULL) {
 | 
						|
    if (error)
 | 
						|
      *error = errno;
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
 | 
						|
                            int shflag, int* error) {
 | 
						|
  return Open(subcommand, mode, error);
 | 
						|
}
 | 
						|
 | 
						|
void POpenStream::DoClose() {
 | 
						|
  wait_status_ = pclose(file_);
 | 
						|
}
 | 
						|
 | 
						|
#endif
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// MemoryStream
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
MemoryStreamBase::MemoryStreamBase()
 | 
						|
  : buffer_(NULL), buffer_length_(0), data_length_(0),
 | 
						|
    seek_position_(0) {
 | 
						|
}
 | 
						|
 | 
						|
StreamState MemoryStreamBase::GetState() const {
 | 
						|
  return SS_OPEN;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
 | 
						|
                                    size_t* bytes_read, int* error) {
 | 
						|
  if (seek_position_ >= data_length_) {
 | 
						|
    return SR_EOS;
 | 
						|
  }
 | 
						|
  size_t available = data_length_ - seek_position_;
 | 
						|
  if (bytes > available) {
 | 
						|
    // Read partial buffer
 | 
						|
    bytes = available;
 | 
						|
  }
 | 
						|
  memcpy(buffer, &buffer_[seek_position_], bytes);
 | 
						|
  seek_position_ += bytes;
 | 
						|
  if (bytes_read) {
 | 
						|
    *bytes_read = bytes;
 | 
						|
  }
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
 | 
						|
                                     size_t* bytes_written, int* error) {
 | 
						|
  size_t available = buffer_length_ - seek_position_;
 | 
						|
  if (0 == available) {
 | 
						|
    // Increase buffer size to the larger of:
 | 
						|
    // a) new position rounded up to next 256 bytes
 | 
						|
    // b) double the previous length
 | 
						|
    size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
 | 
						|
                                    buffer_length_ * 2);
 | 
						|
    StreamResult result = DoReserve(new_buffer_length, error);
 | 
						|
    if (SR_SUCCESS != result) {
 | 
						|
      return result;
 | 
						|
    }
 | 
						|
    ASSERT(buffer_length_ >= new_buffer_length);
 | 
						|
    available = buffer_length_ - seek_position_;
 | 
						|
  }
 | 
						|
 | 
						|
  if (bytes > available) {
 | 
						|
    bytes = available;
 | 
						|
  }
 | 
						|
  memcpy(&buffer_[seek_position_], buffer, bytes);
 | 
						|
  seek_position_ += bytes;
 | 
						|
  if (data_length_ < seek_position_) {
 | 
						|
    data_length_ = seek_position_;
 | 
						|
  }
 | 
						|
  if (bytes_written) {
 | 
						|
    *bytes_written = bytes;
 | 
						|
  }
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
void MemoryStreamBase::Close() {
 | 
						|
  // nothing to do
 | 
						|
}
 | 
						|
 | 
						|
bool MemoryStreamBase::SetPosition(size_t position) {
 | 
						|
  if (position > data_length_)
 | 
						|
    return false;
 | 
						|
  seek_position_ = position;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool MemoryStreamBase::GetPosition(size_t* position) const {
 | 
						|
  if (position)
 | 
						|
    *position = seek_position_;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool MemoryStreamBase::GetSize(size_t* size) const {
 | 
						|
  if (size)
 | 
						|
    *size = data_length_;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool MemoryStreamBase::GetAvailable(size_t* size) const {
 | 
						|
  if (size)
 | 
						|
    *size = data_length_ - seek_position_;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool MemoryStreamBase::ReserveSize(size_t size) {
 | 
						|
  return (SR_SUCCESS == DoReserve(size, NULL));
 | 
						|
}
 | 
						|
 | 
						|
StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
 | 
						|
  return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
MemoryStream::MemoryStream()
 | 
						|
  : buffer_alloc_(NULL) {
 | 
						|
}
 | 
						|
 | 
						|
MemoryStream::MemoryStream(const char* data)
 | 
						|
  : buffer_alloc_(NULL) {
 | 
						|
  SetData(data, strlen(data));
 | 
						|
}
 | 
						|
 | 
						|
MemoryStream::MemoryStream(const void* data, size_t length)
 | 
						|
  : buffer_alloc_(NULL) {
 | 
						|
  SetData(data, length);
 | 
						|
}
 | 
						|
 | 
						|
MemoryStream::~MemoryStream() {
 | 
						|
  delete [] buffer_alloc_;
 | 
						|
}
 | 
						|
 | 
						|
void MemoryStream::SetData(const void* data, size_t length) {
 | 
						|
  data_length_ = buffer_length_ = length;
 | 
						|
  delete [] buffer_alloc_;
 | 
						|
  buffer_alloc_ = new char[buffer_length_ + kAlignment];
 | 
						|
  buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
 | 
						|
  memcpy(buffer_, data, data_length_);
 | 
						|
  seek_position_ = 0;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult MemoryStream::DoReserve(size_t size, int* error) {
 | 
						|
  if (buffer_length_ >= size)
 | 
						|
    return SR_SUCCESS;
 | 
						|
 | 
						|
  if (char* new_buffer_alloc = new char[size + kAlignment]) {
 | 
						|
    char* new_buffer = reinterpret_cast<char*>(
 | 
						|
        ALIGNP(new_buffer_alloc, kAlignment));
 | 
						|
    memcpy(new_buffer, buffer_, data_length_);
 | 
						|
    delete [] buffer_alloc_;
 | 
						|
    buffer_alloc_ = new_buffer_alloc;
 | 
						|
    buffer_ = new_buffer;
 | 
						|
    buffer_length_ = size;
 | 
						|
    return SR_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  if (error) {
 | 
						|
    *error = ENOMEM;
 | 
						|
  }
 | 
						|
  return SR_ERROR;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
ExternalMemoryStream::ExternalMemoryStream() {
 | 
						|
}
 | 
						|
 | 
						|
ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
 | 
						|
  SetData(data, length);
 | 
						|
}
 | 
						|
 | 
						|
ExternalMemoryStream::~ExternalMemoryStream() {
 | 
						|
}
 | 
						|
 | 
						|
void ExternalMemoryStream::SetData(void* data, size_t length) {
 | 
						|
  data_length_ = buffer_length_ = length;
 | 
						|
  buffer_ = static_cast<char*>(data);
 | 
						|
  seek_position_ = 0;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// FifoBuffer
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
FifoBuffer::FifoBuffer(size_t size)
 | 
						|
    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
 | 
						|
      data_length_(0), read_position_(0), owner_(Thread::Current()) {
 | 
						|
  // all events are done on the owner_ thread
 | 
						|
}
 | 
						|
 | 
						|
FifoBuffer::FifoBuffer(size_t size, Thread* owner)
 | 
						|
    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
 | 
						|
      data_length_(0), read_position_(0), owner_(owner) {
 | 
						|
  // all events are done on the owner_ thread
 | 
						|
}
 | 
						|
 | 
						|
FifoBuffer::~FifoBuffer() {
 | 
						|
}
 | 
						|
 | 
						|
bool FifoBuffer::GetBuffered(size_t* size) const {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  *size = data_length_;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool FifoBuffer::SetCapacity(size_t size) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  if (data_length_ > size) {
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
  if (size != buffer_length_) {
 | 
						|
    char* buffer = new char[size];
 | 
						|
    const size_t copy = data_length_;
 | 
						|
    const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
 | 
						|
    memcpy(buffer, &buffer_[read_position_], tail_copy);
 | 
						|
    memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
 | 
						|
    buffer_.reset(buffer);
 | 
						|
    read_position_ = 0;
 | 
						|
    buffer_length_ = size;
 | 
						|
  }
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
 | 
						|
                                    size_t offset, size_t* bytes_read) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
 | 
						|
                                     size_t offset, size_t* bytes_written) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
 | 
						|
}
 | 
						|
 | 
						|
StreamState FifoBuffer::GetState() const {
 | 
						|
  return state_;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
 | 
						|
                              size_t* bytes_read, int* error) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  const bool was_writable = data_length_ < buffer_length_;
 | 
						|
  size_t copy = 0;
 | 
						|
  StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
 | 
						|
 | 
						|
  if (result == SR_SUCCESS) {
 | 
						|
    // If read was successful then adjust the read position and number of
 | 
						|
    // bytes buffered.
 | 
						|
    read_position_ = (read_position_ + copy) % buffer_length_;
 | 
						|
    data_length_ -= copy;
 | 
						|
    if (bytes_read) {
 | 
						|
      *bytes_read = copy;
 | 
						|
    }
 | 
						|
 | 
						|
    // if we were full before, and now we're not, post an event
 | 
						|
    if (!was_writable && copy > 0) {
 | 
						|
      PostEvent(owner_, SE_WRITE, 0);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
 | 
						|
                               size_t* bytes_written, int* error) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
 | 
						|
  const bool was_readable = (data_length_ > 0);
 | 
						|
  size_t copy = 0;
 | 
						|
  StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
 | 
						|
 | 
						|
  if (result == SR_SUCCESS) {
 | 
						|
    // If write was successful then adjust the number of readable bytes.
 | 
						|
    data_length_ += copy;
 | 
						|
    if (bytes_written) {
 | 
						|
      *bytes_written = copy;
 | 
						|
    }
 | 
						|
 | 
						|
    // if we didn't have any data to read before, and now we do, post an event
 | 
						|
    if (!was_readable && copy > 0) {
 | 
						|
      PostEvent(owner_, SE_READ, 0);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
void FifoBuffer::Close() {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  state_ = SS_CLOSED;
 | 
						|
}
 | 
						|
 | 
						|
const void* FifoBuffer::GetReadData(size_t* size) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  *size = (read_position_ + data_length_ <= buffer_length_) ?
 | 
						|
      data_length_ : buffer_length_ - read_position_;
 | 
						|
  return &buffer_[read_position_];
 | 
						|
}
 | 
						|
 | 
						|
void FifoBuffer::ConsumeReadData(size_t size) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  ASSERT(size <= data_length_);
 | 
						|
  const bool was_writable = data_length_ < buffer_length_;
 | 
						|
  read_position_ = (read_position_ + size) % buffer_length_;
 | 
						|
  data_length_ -= size;
 | 
						|
  if (!was_writable && size > 0) {
 | 
						|
    PostEvent(owner_, SE_WRITE, 0);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void* FifoBuffer::GetWriteBuffer(size_t* size) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  if (state_ == SS_CLOSED) {
 | 
						|
    return NULL;
 | 
						|
  }
 | 
						|
 | 
						|
  // if empty, reset the write position to the beginning, so we can get
 | 
						|
  // the biggest possible block
 | 
						|
  if (data_length_ == 0) {
 | 
						|
    read_position_ = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  const size_t write_position = (read_position_ + data_length_)
 | 
						|
      % buffer_length_;
 | 
						|
  *size = (write_position > read_position_ || data_length_ == 0) ?
 | 
						|
      buffer_length_ - write_position : read_position_ - write_position;
 | 
						|
  return &buffer_[write_position];
 | 
						|
}
 | 
						|
 | 
						|
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  ASSERT(size <= buffer_length_ - data_length_);
 | 
						|
  const bool was_readable = (data_length_ > 0);
 | 
						|
  data_length_ += size;
 | 
						|
  if (!was_readable && size > 0) {
 | 
						|
    PostEvent(owner_, SE_READ, 0);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
bool FifoBuffer::GetWriteRemaining(size_t* size) const {
 | 
						|
  CritScope cs(&crit_);
 | 
						|
  *size = buffer_length_ - data_length_;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
 | 
						|
                                          size_t bytes,
 | 
						|
                                          size_t offset,
 | 
						|
                                          size_t* bytes_read) {
 | 
						|
  if (offset >= data_length_) {
 | 
						|
    return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
 | 
						|
  }
 | 
						|
 | 
						|
  const size_t available = data_length_ - offset;
 | 
						|
  const size_t read_position = (read_position_ + offset) % buffer_length_;
 | 
						|
  const size_t copy = _min(bytes, available);
 | 
						|
  const size_t tail_copy = _min(copy, buffer_length_ - read_position);
 | 
						|
  char* const p = static_cast<char*>(buffer);
 | 
						|
  memcpy(p, &buffer_[read_position], tail_copy);
 | 
						|
  memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
 | 
						|
 | 
						|
  if (bytes_read) {
 | 
						|
    *bytes_read = copy;
 | 
						|
  }
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
 | 
						|
                                           size_t bytes,
 | 
						|
                                           size_t offset,
 | 
						|
                                           size_t* bytes_written) {
 | 
						|
  if (state_ == SS_CLOSED) {
 | 
						|
    return SR_EOS;
 | 
						|
  }
 | 
						|
 | 
						|
  if (data_length_ + offset >= buffer_length_) {
 | 
						|
    return SR_BLOCK;
 | 
						|
  }
 | 
						|
 | 
						|
  const size_t available = buffer_length_ - data_length_ - offset;
 | 
						|
  const size_t write_position = (read_position_ + data_length_ + offset)
 | 
						|
      % buffer_length_;
 | 
						|
  const size_t copy = _min(bytes, available);
 | 
						|
  const size_t tail_copy = _min(copy, buffer_length_ - write_position);
 | 
						|
  const char* const p = static_cast<const char*>(buffer);
 | 
						|
  memcpy(&buffer_[write_position], p, tail_copy);
 | 
						|
  memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
 | 
						|
 | 
						|
  if (bytes_written) {
 | 
						|
    *bytes_written = copy;
 | 
						|
  }
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// LoggingAdapter
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
 | 
						|
                               const std::string& label, bool hex_mode)
 | 
						|
    : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
 | 
						|
  set_label(label);
 | 
						|
}
 | 
						|
 | 
						|
void LoggingAdapter::set_label(const std::string& label) {
 | 
						|
  label_.assign("[");
 | 
						|
  label_.append(label);
 | 
						|
  label_.append("]");
 | 
						|
}
 | 
						|
 | 
						|
StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
 | 
						|
                                  size_t* read, int* error) {
 | 
						|
  size_t local_read; if (!read) read = &local_read;
 | 
						|
  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
 | 
						|
                                                     error);
 | 
						|
  if (result == SR_SUCCESS) {
 | 
						|
    LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
 | 
						|
                                   size_t* written, int* error) {
 | 
						|
  size_t local_written;
 | 
						|
  if (!written) written = &local_written;
 | 
						|
  StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
 | 
						|
                                                      error);
 | 
						|
  if (result == SR_SUCCESS) {
 | 
						|
    LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
 | 
						|
                 &lms_);
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
void LoggingAdapter::Close() {
 | 
						|
  LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
 | 
						|
  LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
 | 
						|
  LOG_V(level_) << label_ << " Closed locally";
 | 
						|
  StreamAdapterInterface::Close();
 | 
						|
}
 | 
						|
 | 
						|
void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
 | 
						|
  if (events & SE_OPEN) {
 | 
						|
    LOG_V(level_) << label_ << " Open";
 | 
						|
  } else if (events & SE_CLOSE) {
 | 
						|
    LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
 | 
						|
    LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
 | 
						|
    LOG_V(level_) << label_ << " Closed with error: " << err;
 | 
						|
  }
 | 
						|
  StreamAdapterInterface::OnEvent(stream, events, err);
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// StringStream - Reads/Writes to an external std::string
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
StringStream::StringStream(std::string& str)
 | 
						|
    : str_(str), read_pos_(0), read_only_(false) {
 | 
						|
}
 | 
						|
 | 
						|
StringStream::StringStream(const std::string& str)
 | 
						|
    : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
 | 
						|
}
 | 
						|
 | 
						|
StreamState StringStream::GetState() const {
 | 
						|
  return SS_OPEN;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StringStream::Read(void* buffer, size_t buffer_len,
 | 
						|
                                      size_t* read, int* error) {
 | 
						|
  size_t available = _min(buffer_len, str_.size() - read_pos_);
 | 
						|
  if (!available)
 | 
						|
    return SR_EOS;
 | 
						|
  memcpy(buffer, str_.data() + read_pos_, available);
 | 
						|
  read_pos_ += available;
 | 
						|
  if (read)
 | 
						|
    *read = available;
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
StreamResult StringStream::Write(const void* data, size_t data_len,
 | 
						|
                                      size_t* written, int* error) {
 | 
						|
  if (read_only_) {
 | 
						|
    if (error) {
 | 
						|
      *error = -1;
 | 
						|
    }
 | 
						|
    return SR_ERROR;
 | 
						|
  }
 | 
						|
  str_.append(static_cast<const char*>(data),
 | 
						|
              static_cast<const char*>(data) + data_len);
 | 
						|
  if (written)
 | 
						|
    *written = data_len;
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
void StringStream::Close() {
 | 
						|
}
 | 
						|
 | 
						|
bool StringStream::SetPosition(size_t position) {
 | 
						|
  if (position > str_.size())
 | 
						|
    return false;
 | 
						|
  read_pos_ = position;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StringStream::GetPosition(size_t* position) const {
 | 
						|
  if (position)
 | 
						|
    *position = read_pos_;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StringStream::GetSize(size_t* size) const {
 | 
						|
  if (size)
 | 
						|
    *size = str_.size();
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StringStream::GetAvailable(size_t* size) const {
 | 
						|
  if (size)
 | 
						|
    *size = str_.size() - read_pos_;
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
bool StringStream::ReserveSize(size_t size) {
 | 
						|
  if (read_only_)
 | 
						|
    return false;
 | 
						|
  str_.reserve(size);
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
// StreamReference
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
StreamReference::StreamReference(StreamInterface* stream)
 | 
						|
    : StreamAdapterInterface(stream, false) {
 | 
						|
  // owner set to false so the destructor does not free the stream.
 | 
						|
  stream_ref_count_ = new StreamRefCount(stream);
 | 
						|
}
 | 
						|
 | 
						|
StreamInterface* StreamReference::NewReference() {
 | 
						|
  stream_ref_count_->AddReference();
 | 
						|
  return new StreamReference(stream_ref_count_, stream());
 | 
						|
}
 | 
						|
 | 
						|
StreamReference::~StreamReference() {
 | 
						|
  stream_ref_count_->Release();
 | 
						|
}
 | 
						|
 | 
						|
StreamReference::StreamReference(StreamRefCount* stream_ref_count,
 | 
						|
                                 StreamInterface* stream)
 | 
						|
    : StreamAdapterInterface(stream, false),
 | 
						|
      stream_ref_count_(stream_ref_count) {
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
StreamResult Flow(StreamInterface* source,
 | 
						|
                  char* buffer, size_t buffer_len,
 | 
						|
                  StreamInterface* sink,
 | 
						|
                  size_t* data_len /* = NULL */) {
 | 
						|
  ASSERT(buffer_len > 0);
 | 
						|
 | 
						|
  StreamResult result;
 | 
						|
  size_t count, read_pos, write_pos;
 | 
						|
  if (data_len) {
 | 
						|
    read_pos = *data_len;
 | 
						|
  } else {
 | 
						|
    read_pos = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  bool end_of_stream = false;
 | 
						|
  do {
 | 
						|
    // Read until buffer is full, end of stream, or error
 | 
						|
    while (!end_of_stream && (read_pos < buffer_len)) {
 | 
						|
      result = source->Read(buffer + read_pos, buffer_len - read_pos,
 | 
						|
                            &count, NULL);
 | 
						|
      if (result == SR_EOS) {
 | 
						|
        end_of_stream = true;
 | 
						|
      } else if (result != SR_SUCCESS) {
 | 
						|
        if (data_len) {
 | 
						|
          *data_len = read_pos;
 | 
						|
        }
 | 
						|
        return result;
 | 
						|
      } else {
 | 
						|
        read_pos += count;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    // Write until buffer is empty, or error (including end of stream)
 | 
						|
    write_pos = 0;
 | 
						|
    while (write_pos < read_pos) {
 | 
						|
      result = sink->Write(buffer + write_pos, read_pos - write_pos,
 | 
						|
                           &count, NULL);
 | 
						|
      if (result != SR_SUCCESS) {
 | 
						|
        if (data_len) {
 | 
						|
          *data_len = read_pos - write_pos;
 | 
						|
          if (write_pos > 0) {
 | 
						|
            memmove(buffer, buffer + write_pos, *data_len);
 | 
						|
          }
 | 
						|
        }
 | 
						|
        return result;
 | 
						|
      }
 | 
						|
      write_pos += count;
 | 
						|
    }
 | 
						|
 | 
						|
    read_pos = 0;
 | 
						|
  } while (!end_of_stream);
 | 
						|
 | 
						|
  if (data_len) {
 | 
						|
    *data_len = 0;
 | 
						|
  }
 | 
						|
  return SR_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
///////////////////////////////////////////////////////////////////////////////
 | 
						|
 | 
						|
}  // namespace talk_base
 |