zero_copy_stream_impl.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. // Protocol Buffers - Google's data interchange format
  2. // Copyright 2008 Google Inc. All rights reserved.
  3. // https://developers.google.com/protocol-buffers/
  4. //
  5. // Redistribution and use in source and binary forms, with or without
  6. // modification, are permitted provided that the following conditions are
  7. // met:
  8. //
  9. // * Redistributions of source code must retain the above copyright
  10. // notice, this list of conditions and the following disclaimer.
  11. // * Redistributions in binary form must reproduce the above
  12. // copyright notice, this list of conditions and the following disclaimer
  13. // in the documentation and/or other materials provided with the
  14. // distribution.
  15. // * Neither the name of Google Inc. nor the names of its
  16. // contributors may be used to endorse or promote products derived from
  17. // this software without specific prior written permission.
  18. //
  19. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  20. // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  21. // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  22. // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  23. // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  24. // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  25. // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  26. // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  27. // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  28. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  29. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  30. // Author: kenton@google.com (Kenton Varda)
  31. // Based on original Protocol Buffers design by
  32. // Sanjay Ghemawat, Jeff Dean, and others.
  33. #ifndef _MSC_VER
  34. #include <unistd.h>
  35. #include <sys/types.h>
  36. #include <sys/stat.h>
  37. #include <fcntl.h>
  38. #endif
  39. #include <errno.h>
  40. #include <iostream>
  41. #include <algorithm>
  42. #include <google/protobuf/io/zero_copy_stream_impl.h>
  43. #include <google/protobuf/stubs/common.h>
  44. #include <google/protobuf/stubs/logging.h>
  45. #include <google/protobuf/stubs/stl_util.h>
  46. #include <google/protobuf/stubs/io_win32.h>
  47. namespace google {
  48. namespace protobuf {
  49. namespace io {
  50. #ifdef _WIN32
  51. // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
  52. // return value is undefined. We re-define it to always produce an error.
  53. #define lseek(fd, offset, origin) ((off_t)-1)
  54. // DO NOT include <io.h>, instead create functions in io_win32.{h,cc} and import
  55. // them like we do below.
  56. using google::protobuf::internal::win32::access;
  57. using google::protobuf::internal::win32::close;
  58. using google::protobuf::internal::win32::open;
  59. using google::protobuf::internal::win32::read;
  60. using google::protobuf::internal::win32::write;
  61. #endif
  62. namespace {
  63. // EINTR sucks.
  64. int close_no_eintr(int fd) {
  65. int result;
  66. do {
  67. result = close(fd);
  68. } while (result < 0 && errno == EINTR);
  69. return result;
  70. }
  71. } // namespace
  72. // ===================================================================
  73. FileInputStream::FileInputStream(int file_descriptor, int block_size)
  74. : copying_input_(file_descriptor),
  75. impl_(&copying_input_, block_size) {
  76. }
  77. bool FileInputStream::Close() {
  78. return copying_input_.Close();
  79. }
  80. bool FileInputStream::Next(const void** data, int* size) {
  81. return impl_.Next(data, size);
  82. }
  83. void FileInputStream::BackUp(int count) {
  84. impl_.BackUp(count);
  85. }
  86. bool FileInputStream::Skip(int count) {
  87. return impl_.Skip(count);
  88. }
  89. int64 FileInputStream::ByteCount() const {
  90. return impl_.ByteCount();
  91. }
  92. FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
  93. int file_descriptor)
  94. : file_(file_descriptor),
  95. close_on_delete_(false),
  96. is_closed_(false),
  97. errno_(0),
  98. previous_seek_failed_(false) {
  99. }
  100. FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
  101. if (close_on_delete_) {
  102. if (!Close()) {
  103. GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
  104. }
  105. }
  106. }
  107. bool FileInputStream::CopyingFileInputStream::Close() {
  108. GOOGLE_CHECK(!is_closed_);
  109. is_closed_ = true;
  110. if (close_no_eintr(file_) != 0) {
  111. // The docs on close() do not specify whether a file descriptor is still
  112. // open after close() fails with EIO. However, the glibc source code
  113. // seems to indicate that it is not.
  114. errno_ = errno;
  115. return false;
  116. }
  117. return true;
  118. }
  119. int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
  120. GOOGLE_CHECK(!is_closed_);
  121. int result;
  122. do {
  123. result = read(file_, buffer, size);
  124. } while (result < 0 && errno == EINTR);
  125. if (result < 0) {
  126. // Read error (not EOF).
  127. errno_ = errno;
  128. }
  129. return result;
  130. }
  131. int FileInputStream::CopyingFileInputStream::Skip(int count) {
  132. GOOGLE_CHECK(!is_closed_);
  133. if (!previous_seek_failed_ &&
  134. lseek(file_, count, SEEK_CUR) != (off_t)-1) {
  135. // Seek succeeded.
  136. return count;
  137. } else {
  138. // Failed to seek.
  139. // Note to self: Don't seek again. This file descriptor doesn't
  140. // support it.
  141. previous_seek_failed_ = true;
  142. // Use the default implementation.
  143. return CopyingInputStream::Skip(count);
  144. }
  145. }
  146. // ===================================================================
  147. FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
  148. : copying_output_(file_descriptor),
  149. impl_(&copying_output_, block_size) {
  150. }
  151. FileOutputStream::~FileOutputStream() {
  152. impl_.Flush();
  153. }
  154. bool FileOutputStream::Close() {
  155. bool flush_succeeded = impl_.Flush();
  156. return copying_output_.Close() && flush_succeeded;
  157. }
  158. bool FileOutputStream::Flush() {
  159. return impl_.Flush();
  160. }
  161. bool FileOutputStream::Next(void** data, int* size) {
  162. return impl_.Next(data, size);
  163. }
  164. void FileOutputStream::BackUp(int count) {
  165. impl_.BackUp(count);
  166. }
  167. int64 FileOutputStream::ByteCount() const {
  168. return impl_.ByteCount();
  169. }
  170. FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
  171. int file_descriptor)
  172. : file_(file_descriptor),
  173. close_on_delete_(false),
  174. is_closed_(false),
  175. errno_(0) {
  176. }
  177. FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
  178. if (close_on_delete_) {
  179. if (!Close()) {
  180. GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
  181. }
  182. }
  183. }
  184. bool FileOutputStream::CopyingFileOutputStream::Close() {
  185. GOOGLE_CHECK(!is_closed_);
  186. is_closed_ = true;
  187. if (close_no_eintr(file_) != 0) {
  188. // The docs on close() do not specify whether a file descriptor is still
  189. // open after close() fails with EIO. However, the glibc source code
  190. // seems to indicate that it is not.
  191. errno_ = errno;
  192. return false;
  193. }
  194. return true;
  195. }
  196. bool FileOutputStream::CopyingFileOutputStream::Write(
  197. const void* buffer, int size) {
  198. GOOGLE_CHECK(!is_closed_);
  199. int total_written = 0;
  200. const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
  201. while (total_written < size) {
  202. int bytes;
  203. do {
  204. bytes = write(file_, buffer_base + total_written, size - total_written);
  205. } while (bytes < 0 && errno == EINTR);
  206. if (bytes <= 0) {
  207. // Write error.
  208. // FIXME(kenton): According to the man page, if write() returns zero,
  209. // there was no error; write() simply did not write anything. It's
  210. // unclear under what circumstances this might happen, but presumably
  211. // errno won't be set in this case. I am confused as to how such an
  212. // event should be handled. For now I'm treating it as an error, since
  213. // retrying seems like it could lead to an infinite loop. I suspect
  214. // this never actually happens anyway.
  215. if (bytes < 0) {
  216. errno_ = errno;
  217. }
  218. return false;
  219. }
  220. total_written += bytes;
  221. }
  222. return true;
  223. }
  224. // ===================================================================
  225. IstreamInputStream::IstreamInputStream(std::istream* input, int block_size)
  226. : copying_input_(input), impl_(&copying_input_, block_size) {}
  227. bool IstreamInputStream::Next(const void** data, int* size) {
  228. return impl_.Next(data, size);
  229. }
  230. void IstreamInputStream::BackUp(int count) {
  231. impl_.BackUp(count);
  232. }
  233. bool IstreamInputStream::Skip(int count) {
  234. return impl_.Skip(count);
  235. }
  236. int64 IstreamInputStream::ByteCount() const {
  237. return impl_.ByteCount();
  238. }
  239. IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
  240. std::istream* input)
  241. : input_(input) {}
  242. IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
  243. int IstreamInputStream::CopyingIstreamInputStream::Read(
  244. void* buffer, int size) {
  245. input_->read(reinterpret_cast<char*>(buffer), size);
  246. int result = input_->gcount();
  247. if (result == 0 && input_->fail() && !input_->eof()) {
  248. return -1;
  249. }
  250. return result;
  251. }
  252. // ===================================================================
  253. OstreamOutputStream::OstreamOutputStream(std::ostream* output, int block_size)
  254. : copying_output_(output), impl_(&copying_output_, block_size) {}
  255. OstreamOutputStream::~OstreamOutputStream() {
  256. impl_.Flush();
  257. }
  258. bool OstreamOutputStream::Next(void** data, int* size) {
  259. return impl_.Next(data, size);
  260. }
  261. void OstreamOutputStream::BackUp(int count) {
  262. impl_.BackUp(count);
  263. }
  264. int64 OstreamOutputStream::ByteCount() const {
  265. return impl_.ByteCount();
  266. }
  267. OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
  268. std::ostream* output)
  269. : output_(output) {}
  270. OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
  271. }
  272. bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
  273. const void* buffer, int size) {
  274. output_->write(reinterpret_cast<const char*>(buffer), size);
  275. return output_->good();
  276. }
  277. // ===================================================================
  278. ConcatenatingInputStream::ConcatenatingInputStream(
  279. ZeroCopyInputStream* const streams[], int count)
  280. : streams_(streams), stream_count_(count), bytes_retired_(0) {
  281. }
  282. bool ConcatenatingInputStream::Next(const void** data, int* size) {
  283. while (stream_count_ > 0) {
  284. if (streams_[0]->Next(data, size)) return true;
  285. // That stream is done. Advance to the next one.
  286. bytes_retired_ += streams_[0]->ByteCount();
  287. ++streams_;
  288. --stream_count_;
  289. }
  290. // No more streams.
  291. return false;
  292. }
  293. void ConcatenatingInputStream::BackUp(int count) {
  294. if (stream_count_ > 0) {
  295. streams_[0]->BackUp(count);
  296. } else {
  297. GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
  298. }
  299. }
  300. bool ConcatenatingInputStream::Skip(int count) {
  301. while (stream_count_ > 0) {
  302. // Assume that ByteCount() can be used to find out how much we actually
  303. // skipped when Skip() fails.
  304. int64 target_byte_count = streams_[0]->ByteCount() + count;
  305. if (streams_[0]->Skip(count)) return true;
  306. // Hit the end of the stream. Figure out how many more bytes we still have
  307. // to skip.
  308. int64 final_byte_count = streams_[0]->ByteCount();
  309. GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
  310. count = target_byte_count - final_byte_count;
  311. // That stream is done. Advance to the next one.
  312. bytes_retired_ += final_byte_count;
  313. ++streams_;
  314. --stream_count_;
  315. }
  316. return false;
  317. }
  318. int64 ConcatenatingInputStream::ByteCount() const {
  319. if (stream_count_ == 0) {
  320. return bytes_retired_;
  321. } else {
  322. return bytes_retired_ + streams_[0]->ByteCount();
  323. }
  324. }
  325. // ===================================================================
  326. LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
  327. int64 limit)
  328. : input_(input), limit_(limit) {
  329. prior_bytes_read_ = input_->ByteCount();
  330. }
  331. LimitingInputStream::~LimitingInputStream() {
  332. // If we overshot the limit, back up.
  333. if (limit_ < 0) input_->BackUp(-limit_);
  334. }
  335. bool LimitingInputStream::Next(const void** data, int* size) {
  336. if (limit_ <= 0) return false;
  337. if (!input_->Next(data, size)) return false;
  338. limit_ -= *size;
  339. if (limit_ < 0) {
  340. // We overshot the limit. Reduce *size to hide the rest of the buffer.
  341. *size += limit_;
  342. }
  343. return true;
  344. }
  345. void LimitingInputStream::BackUp(int count) {
  346. if (limit_ < 0) {
  347. input_->BackUp(count - limit_);
  348. limit_ = count;
  349. } else {
  350. input_->BackUp(count);
  351. limit_ += count;
  352. }
  353. }
  354. bool LimitingInputStream::Skip(int count) {
  355. if (count > limit_) {
  356. if (limit_ < 0) return false;
  357. input_->Skip(limit_);
  358. limit_ = 0;
  359. return false;
  360. } else {
  361. if (!input_->Skip(count)) return false;
  362. limit_ -= count;
  363. return true;
  364. }
  365. }
  366. int64 LimitingInputStream::ByteCount() const {
  367. if (limit_ < 0) {
  368. return input_->ByteCount() + limit_ - prior_bytes_read_;
  369. } else {
  370. return input_->ByteCount() - prior_bytes_read_;
  371. }
  372. }
  373. // ===================================================================
  374. } // namespace io
  375. } // namespace protobuf
  376. } // namespace google