xrootd
XrdTpcStream.hh
Go to the documentation of this file.
1 
10 #include <memory>
11 #include <vector>
12 #include <string>
13 
14 #include <cstring>
15 
16 struct stat;
17 
18 class XrdSfsFile;
19 class XrdSysError;
20 
21 namespace TPC {
22 class Stream {
23 public:
24  Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
25  : m_open_for_write(false),
26  m_avail_count(max_blocks),
27  m_fh(std::move(fh)),
28  m_offset(0),
29  m_log(log)
30  {
31  m_buffers.reserve(max_blocks);
32  for (size_t idx=0; idx < max_blocks; idx++) {
33  m_buffers.push_back(new Entry(buffer_size));
34  }
35  m_open_for_write = true;
36  }
37 
39 
40  int Stat(struct stat *);
41 
42  int Read(off_t offset, char *buffer, size_t size);
43 
44  int Write(off_t offset, const char *buffer, size_t size, bool force);
45 
46  size_t AvailableBuffers() const {return m_avail_count;}
47 
48  void DumpBuffers() const;
49 
50  // Flush and finalize the stream. If all data has been sent to the underlying
51  // file handle, close() will be invoked on the file handle.
52  //
53  // Further write operations on this stream will result in an error.
54  // If any memory buffers remain, an error occurs.
55  //
56  // Returns true on success; false otherwise.
57  bool Finalize();
58 
59  std::string GetErrorMessage() const {return m_error_buf;}
60 
61 private:
62 
63  class Entry {
64  public:
65  Entry(size_t capacity) :
66  m_offset(-1),
67  m_capacity(capacity),
68  m_size(0)
69  {}
70 
71  bool Available() const {return m_offset == -1;}
72 
73  int Write(Stream &stream, bool force) {
74  if (Available() || !CanWrite(stream)) {return 0;}
75  // Currently, only full writes are accepted along megabyte boundaries
76  // unless the stream forces a flush (i.e., we are at EOF).
77  size_t size_desired = m_size;
78  if (!force) {
79  size_desired -= (size_desired % (1024*1024));
80  if (!size_desired) {return 0;}
81  }
82  int retval = stream.Write(m_offset, &m_buffer[0], size_desired, force);
83  if (retval < 0 && (static_cast<size_t>(retval) != size_desired)) {
84  return -1;
85  }
86  // If partial data remains, copy it to the beginning of the buffer.
87  // Otherwise, mark the buffer as available.
88  if (size_desired < m_size) {
89  m_offset += size_desired;
90  m_size -= size_desired;
91  memcpy(&m_buffer[0], &m_buffer[size_desired], m_size);
92  } else {
93  m_offset = -1;
94  m_size = 0;
95  }
96  return retval;
97  }
98 
99  bool Accept(off_t offset, const char *buf, size_t size) {
100  // Validate acceptance criteria.
101  if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
102  return false;
103  }
104  if (size > m_capacity - m_size) {
105  return false;
106  }
107 
108  // Inflate the underlying buffer if needed.
109  ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity();
110  if (new_bytes_needed > 0) {
111  m_buffer.reserve(m_capacity);
112  }
113 
114  // Finally, do the copy.
115  memcpy(&m_buffer[0] + m_size, buf, size);
116  m_size += size;
117  if (m_offset == -1) {
118  m_offset = offset;
119  }
120  return true;
121  }
122 
123  void ShrinkIfUnused() {
124  if (!Available()) {return;}
125 #if __cplusplus > 199711L
126  m_buffer.shrink_to_fit();
127 #endif
128  }
129 
130  void Move(Entry &other) {
131  m_buffer.swap(other.m_buffer);
132  m_offset = other.m_offset;
133  m_size = other.m_size;
134  }
135 
136  off_t GetOffset() const {return m_offset;}
137  size_t GetCapacity() const {return m_capacity;}
138  size_t GetSize() const {return m_size;}
139 
140  private:
141 
142  Entry(const Entry&) = delete;
143 
144  bool CanWrite(Stream &stream) const {
145  return (m_size > 0) && (m_offset == stream.m_offset);
146  }
147 
148  off_t m_offset; // Offset within file that m_buffer[0] represents.
149  size_t m_capacity;
150  size_t m_size; // Number of bytes held in buffer.
151  std::vector<char> m_buffer;
152  };
153 
156  std::unique_ptr<XrdSfsFile> m_fh;
157  off_t m_offset;
158  std::vector<Entry*> m_buffers;
160  std::string m_error_buf;
161 };
162 }
TPC::Stream::m_open_for_write
bool m_open_for_write
Definition: XrdTpcStream.hh:154
TPC::Stream::Write
int Write(off_t offset, const char *buffer, size_t size, bool force)
TPC
Definition: XrdTpcState.hh:16
TPC::Stream::Entry::Entry
Entry(size_t capacity)
Definition: XrdTpcStream.hh:65
TPC::Stream::Entry::Write
int Write(Stream &stream, bool force)
Definition: XrdTpcStream.hh:73
TPC::Stream::m_avail_count
size_t m_avail_count
Definition: XrdTpcStream.hh:155
TPC::Stream::Entry::GetCapacity
size_t GetCapacity() const
Definition: XrdTpcStream.hh:137
TPC::Stream
Definition: XrdTpcStream.hh:22
TPC::Stream::Entry::Accept
bool Accept(off_t offset, const char *buf, size_t size)
Definition: XrdTpcStream.hh:99
TPC::Stream::Entry::ShrinkIfUnused
void ShrinkIfUnused()
Definition: XrdTpcStream.hh:123
TPC::Stream::m_buffers
std::vector< Entry * > m_buffers
Definition: XrdTpcStream.hh:158
TPC::Stream::Entry::m_buffer
std::vector< char > m_buffer
Definition: XrdTpcStream.hh:151
TPC::Stream::Finalize
bool Finalize()
TPC::Stream::Entry::Entry
Entry(const Entry &)=delete
TPC::Stream::Stat
int Stat(struct stat *)
TPC::Stream::GetErrorMessage
std::string GetErrorMessage() const
Definition: XrdTpcStream.hh:59
XrdSfsFile
Definition: XrdSfsInterface.hh:652
TPC::Stream::Entry::m_size
size_t m_size
Definition: XrdTpcStream.hh:150
TPC::Stream::Entry::GetOffset
off_t GetOffset() const
Definition: XrdTpcStream.hh:136
TPC::Stream::m_offset
off_t m_offset
Definition: XrdTpcStream.hh:157
TPC::Stream::Read
int Read(off_t offset, char *buffer, size_t size)
TPC::Stream::Entry::Move
void Move(Entry &other)
Definition: XrdTpcStream.hh:130
TPC::Stream::Entry::CanWrite
bool CanWrite(Stream &stream) const
Definition: XrdTpcStream.hh:144
TPC::Stream::DumpBuffers
void DumpBuffers() const
TPC::Stream::m_error_buf
std::string m_error_buf
Definition: XrdTpcStream.hh:160
TPC::Stream::Entry::Available
bool Available() const
Definition: XrdTpcStream.hh:71
TPC::Stream::~Stream
~Stream()
TPC::Stream::Stream
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
Definition: XrdTpcStream.hh:24
TPC::Stream::AvailableBuffers
size_t AvailableBuffers() const
Definition: XrdTpcStream.hh:46
TPC::Stream::Entry
Definition: XrdTpcStream.hh:63
TPC::Stream::Entry::m_capacity
size_t m_capacity
Definition: XrdTpcStream.hh:149
TPC::Stream::m_log
XrdSysError & m_log
Definition: XrdTpcStream.hh:159
stat
#define stat(a, b)
Definition: XrdPosix.hh:96
XrdSysError
Definition: XrdSysError.hh:90
TPC::Stream::Entry::m_offset
off_t m_offset
Definition: XrdTpcStream.hh:148
TPC::Stream::Entry::GetSize
size_t GetSize() const
Definition: XrdTpcStream.hh:138
TPC::Stream::m_fh
std::unique_ptr< XrdSfsFile > m_fh
Definition: XrdTpcStream.hh:156