1 | /*! |
2 | * Copyright (c) 2015 by Contributors |
3 | * \file io.h |
4 | * \brief defines serializable interface of dmlc |
5 | */ |
6 | #ifndef DMLC_IO_H_ |
7 | #define DMLC_IO_H_ |
8 | #include <cstdio> |
9 | #include <string> |
10 | #include <cstring> |
11 | #include <vector> |
12 | #include <istream> |
13 | #include <ostream> |
14 | #include <streambuf> |
15 | #include "./logging.h" |
16 | |
17 | // include uint64_t only to make io standalone |
18 | #ifdef _MSC_VER |
19 | /*! \brief uint64 */ |
20 | typedef unsigned __int64 uint64_t; |
21 | #else |
22 | #include <inttypes.h> |
23 | #endif |
24 | |
25 | /*! \brief namespace for dmlc */ |
26 | namespace dmlc { |
27 | /*! |
28 | * \brief interface of stream I/O for serialization |
29 | */ |
30 | class Stream { // NOLINT(*) |
31 | public: |
32 | /*! |
33 | * \brief reads data from a stream |
34 | * \param ptr pointer to a memory buffer |
35 | * \param size block size |
36 | * \return the size of data read |
37 | */ |
38 | virtual size_t Read(void *ptr, size_t size) = 0; |
39 | /*! |
40 | * \brief writes data to a stream |
41 | * \param ptr pointer to a memory buffer |
42 | * \param size block size |
43 | */ |
44 | virtual void Write(const void *ptr, size_t size) = 0; |
45 | /*! \brief virtual destructor */ |
46 | virtual ~Stream(void) {} |
47 | /*! |
48 | * \brief generic factory function |
49 | * create an stream, the stream will close the underlying files upon deletion |
50 | * |
51 | * \param uri the uri of the input currently we support |
52 | * hdfs://, s3://, and file:// by default file:// will be used |
53 | * \param flag can be "w", "r", "a" |
54 | * \param allow_null whether NULL can be returned, or directly report error |
55 | * \return the created stream, can be NULL when allow_null == true and file do not exist |
56 | */ |
57 | static Stream *Create(const char *uri, |
58 | const char* const flag, |
59 | bool allow_null = false); |
60 | // helper functions to write/read different data structures |
61 | /*! |
62 | * \brief writes a data to stream. |
63 | * |
64 | * dmlc::Stream support Write/Read of most STL composites and base types. |
65 | * If the data type is not supported, a compile time error will be issued. |
66 | * |
67 | * This function is endian-aware, |
68 | * the output endian defined by DMLC_IO_USE_LITTLE_ENDIAN |
69 | * |
70 | * \param data data to be written |
71 | * \tparam T the data type to be written |
72 | */ |
73 | template<typename T> |
74 | inline void Write(const T &data); |
75 | /*! |
76 | * \brief loads a data from stream. |
77 | * |
78 | * dmlc::Stream support Write/Read of most STL composites and base types. |
79 | * If the data type is not supported, a compile time error will be issued. |
80 | * |
81 | * This function is endian-aware, |
82 | * the input endian defined by DMLC_IO_USE_LITTLE_ENDIAN |
83 | * |
84 | * \param out_data place holder of data to be deserialized |
85 | * \return whether the load was successful |
86 | */ |
87 | template<typename T> |
88 | inline bool Read(T *out_data); |
89 | /*! |
90 | * \brief Endian aware write array of data. |
91 | * \param data The data pointer |
92 | * \param num_elems Number of elements |
93 | * \tparam T the data type. |
94 | */ |
95 | template<typename T> |
96 | inline void WriteArray(const T* data, size_t num_elems); |
97 | /*! |
98 | * \brief Endian aware read array of data. |
99 | * \param data The data pointer |
100 | * \param num_elems Number of elements |
101 | * \tparam T the data type. |
102 | * \return whether the load was successful |
103 | */ |
104 | template<typename T> |
105 | inline bool ReadArray(T* data, size_t num_elems); |
106 | }; |
107 | |
108 | /*! \brief interface of i/o stream that support seek */ |
109 | class SeekStream: public Stream { |
110 | public: |
111 | // virtual destructor |
112 | virtual ~SeekStream(void) {} |
113 | /*! \brief seek to certain position of the file */ |
114 | virtual void Seek(size_t pos) = 0; |
115 | /*! \brief tell the position of the stream */ |
116 | virtual size_t Tell(void) = 0; |
117 | /*! |
118 | * \brief generic factory function |
119 | * create an SeekStream for read only, |
120 | * the stream will close the underlying files upon deletion |
121 | * error will be reported and the system will exit when create failed |
122 | * \param uri the uri of the input currently we support |
123 | * hdfs://, s3://, and file:// by default file:// will be used |
124 | * \param allow_null whether NULL can be returned, or directly report error |
125 | * \return the created stream, can be NULL when allow_null == true and file do not exist |
126 | */ |
127 | static SeekStream *CreateForRead(const char *uri, |
128 | bool allow_null = false); |
129 | }; |
130 | |
131 | /*! \brief interface for serializable objects */ |
132 | class Serializable { |
133 | public: |
134 | /*! \brief virtual destructor */ |
135 | virtual ~Serializable() {} |
136 | /*! |
137 | * \brief load the model from a stream |
138 | * \param fi stream where to load the model from |
139 | */ |
140 | virtual void Load(Stream *fi) = 0; |
141 | /*! |
142 | * \brief saves the model to a stream |
143 | * \param fo stream where to save the model to |
144 | */ |
145 | virtual void Save(Stream *fo) const = 0; |
146 | }; |
147 | |
148 | /*! |
149 | * \brief input split creates that allows reading |
150 | * of records from split of data, |
151 | * independent part that covers all the dataset |
152 | * |
153 | * see InputSplit::Create for definition of record |
154 | */ |
155 | class InputSplit { |
156 | public: |
157 | /*! \brief a blob of memory region */ |
158 | struct Blob { |
159 | /*! \brief points to start of the memory region */ |
160 | void *dptr; |
161 | /*! \brief size of the memory region */ |
162 | size_t size; |
163 | }; |
164 | /*! |
165 | * \brief hint the inputsplit how large the chunk size |
166 | * it should return when implementing NextChunk |
167 | * this is a hint so may not be enforced, |
168 | * but InputSplit will try adjust its internal buffer |
169 | * size to the hinted value |
170 | * \param chunk_size the chunk size |
171 | */ |
172 | virtual void HintChunkSize(size_t chunk_size) {} |
173 | /*! \brief get the total size of the InputSplit */ |
174 | virtual size_t GetTotalSize(void) = 0; |
175 | /*! \brief reset the position of InputSplit to beginning */ |
176 | virtual void BeforeFirst(void) = 0; |
177 | /*! |
178 | * \brief get the next record, the returning value |
179 | * is valid until next call to NextRecord, NextChunk or NextBatch |
180 | * caller can modify the memory content of out_rec |
181 | * |
182 | * For text, out_rec contains a single line |
183 | * For recordio, out_rec contains one record content(with header striped) |
184 | * |
185 | * \param out_rec used to store the result |
186 | * \return true if we can successfully get next record |
187 | * false if we reached end of split |
188 | * \sa InputSplit::Create for definition of record |
189 | */ |
190 | virtual bool NextRecord(Blob *out_rec) = 0; |
191 | /*! |
192 | * \brief get a chunk of memory that can contain multiple records, |
193 | * the caller needs to parse the content of the resulting chunk, |
194 | * for text file, out_chunk can contain data of multiple lines |
195 | * for recordio, out_chunk can contain multiple records(including headers) |
196 | * |
197 | * This function ensures there won't be partial record in the chunk |
198 | * caller can modify the memory content of out_chunk, |
199 | * the memory is valid until next call to NextRecord, NextChunk or NextBatch |
200 | * |
201 | * Usually NextRecord is sufficient, NextChunk can be used by some |
202 | * multi-threaded parsers to parse the input content |
203 | * |
204 | * \param out_chunk used to store the result |
205 | * \return true if we can successfully get next record |
206 | * false if we reached end of split |
207 | * \sa InputSplit::Create for definition of record |
208 | * \sa RecordIOChunkReader to parse recordio content from out_chunk |
209 | */ |
210 | virtual bool NextChunk(Blob *out_chunk) = 0; |
211 | /*! |
212 | * \brief get a chunk of memory that can contain multiple records, |
213 | * with hint for how many records is needed, |
214 | * the caller needs to parse the content of the resulting chunk, |
215 | * for text file, out_chunk can contain data of multiple lines |
216 | * for recordio, out_chunk can contain multiple records(including headers) |
217 | * |
218 | * This function ensures there won't be partial record in the chunk |
219 | * caller can modify the memory content of out_chunk, |
220 | * the memory is valid until next call to NextRecord, NextChunk or NextBatch |
221 | * |
222 | * |
223 | * \param out_chunk used to store the result |
224 | * \param n_records used as a hint for how many records should be returned, may be ignored |
225 | * \return true if we can successfully get next record |
226 | * false if we reached end of split |
227 | * \sa InputSplit::Create for definition of record |
228 | * \sa RecordIOChunkReader to parse recordio content from out_chunk |
229 | */ |
230 | virtual bool NextBatch(Blob *out_chunk, size_t n_records) { |
231 | return NextChunk(out_chunk); |
232 | } |
233 | /*! \brief destructor*/ |
234 | virtual ~InputSplit(void) DMLC_THROW_EXCEPTION {} |
235 | /*! |
236 | * \brief reset the Input split to a certain part id, |
237 | * The InputSplit will be pointed to the head of the new specified segment. |
238 | * This feature may not be supported by every implementation of InputSplit. |
239 | * \param part_index The part id of the new input. |
240 | * \param num_parts The total number of parts. |
241 | */ |
242 | virtual void ResetPartition(unsigned part_index, unsigned num_parts) = 0; |
243 | /*! |
244 | * \brief factory function: |
245 | * create input split given a uri |
246 | * \param uri the uri of the input, can contain hdfs prefix |
247 | * \param part_index the part id of current input |
248 | * \param num_parts total number of splits |
249 | * \param type type of record |
250 | * List of possible types: "text", "recordio", "indexed_recordio" |
251 | * - "text": |
252 | * text file, each line is treated as a record |
253 | * input split will split on '\\n' or '\\r' |
254 | * - "recordio": |
255 | * binary recordio file, see recordio.h |
256 | * - "indexed_recordio": |
257 | * binary recordio file with index, see recordio.h |
258 | * \return a new input split |
259 | * \sa InputSplit::Type |
260 | */ |
261 | static InputSplit* Create(const char *uri, |
262 | unsigned part_index, |
263 | unsigned num_parts, |
264 | const char *type); |
265 | /*! |
266 | * \brief factory function: |
267 | * create input split given a uri for input and index |
268 | * \param uri the uri of the input, can contain hdfs prefix |
269 | * \param index_uri the uri of the index, can contain hdfs prefix |
270 | * \param part_index the part id of current input |
271 | * \param num_parts total number of splits |
272 | * \param type type of record |
273 | * List of possible types: "text", "recordio", "indexed_recordio" |
274 | * - "text": |
275 | * text file, each line is treated as a record |
276 | * input split will split on '\\n' or '\\r' |
277 | * - "recordio": |
278 | * binary recordio file, see recordio.h |
279 | * - "indexed_recordio": |
280 | * binary recordio file with index, see recordio.h |
281 | * \param shuffle whether to shuffle the output from the InputSplit, |
282 | * supported only by "indexed_recordio" type. |
283 | * Defaults to "false" |
284 | * \param seed random seed to use in conjunction with the "shuffle" |
285 | * option. Defaults to 0 |
286 | * \param batch_size a hint to InputSplit what is the intended number |
287 | * of examples return per batch. Used only by |
288 | * "indexed_recordio" type |
289 | * \param recurse_directories whether to recursively traverse directories |
290 | * \return a new input split |
291 | * \sa InputSplit::Type |
292 | */ |
293 | static InputSplit* Create(const char *uri, |
294 | const char *index_uri, |
295 | unsigned part_index, |
296 | unsigned num_parts, |
297 | const char *type, |
298 | const bool shuffle = false, |
299 | const int seed = 0, |
300 | const size_t batch_size = 256, |
301 | const bool recurse_directories = false); |
302 | }; |
303 | |
304 | #ifndef _LIBCPP_SGX_NO_IOSTREAMS |
305 | /*! |
306 | * \brief a std::ostream class that can can wrap Stream objects, |
307 | * can use ostream with that output to underlying Stream |
308 | * |
309 | * Usage example: |
310 | * \code |
311 | * |
312 | * Stream *fs = Stream::Create("hdfs:///test.txt", "w"); |
313 | * dmlc::ostream os(fs); |
314 | * os << "hello world" << std::endl; |
315 | * delete fs; |
316 | * \endcode |
317 | */ |
318 | class ostream : public std::basic_ostream<char> { |
319 | public: |
320 | /*! |
321 | * \brief construct std::ostream type |
322 | * \param stream the Stream output to be used |
323 | * \param buffer_size internal streambuf size |
324 | */ |
325 | explicit ostream(Stream *stream, |
326 | size_t buffer_size = (1 << 10)) |
327 | : std::basic_ostream<char>(NULL), buf_(buffer_size) { |
328 | this->set_stream(stream); |
329 | } |
330 | // explictly synchronize the buffer |
331 | virtual ~ostream() DMLC_NO_EXCEPTION { |
332 | buf_.pubsync(); |
333 | } |
334 | /*! |
335 | * \brief set internal stream to be stream, reset states |
336 | * \param stream new stream as output |
337 | */ |
338 | inline void set_stream(Stream *stream) { |
339 | buf_.set_stream(stream); |
340 | this->rdbuf(&buf_); |
341 | } |
342 | |
343 | /*! \return how many bytes we written so far */ |
344 | inline size_t bytes_written(void) const { |
345 | return buf_.bytes_out(); |
346 | } |
347 | |
348 | private: |
349 | // internal streambuf |
350 | class OutBuf : public std::streambuf { |
351 | public: |
352 | explicit OutBuf(size_t buffer_size) |
353 | : stream_(NULL), buffer_(buffer_size), bytes_out_(0) { |
354 | if (buffer_size == 0) buffer_.resize(2); |
355 | } |
356 | // set stream to the buffer |
357 | inline void set_stream(Stream *stream); |
358 | |
359 | inline size_t bytes_out() const { return bytes_out_; } |
360 | private: |
361 | /*! \brief internal stream by StreamBuf */ |
362 | Stream *stream_; |
363 | /*! \brief internal buffer */ |
364 | std::vector<char> buffer_; |
365 | /*! \brief number of bytes written so far */ |
366 | size_t bytes_out_; |
367 | // override sync |
368 | inline int_type sync(void); |
369 | // override overflow |
370 | inline int_type overflow(int c); |
371 | }; |
372 | /*! \brief buffer of the stream */ |
373 | OutBuf buf_; |
374 | }; |
375 | |
376 | /*! |
377 | * \brief a std::istream class that can can wrap Stream objects, |
378 | * can use istream with that output to underlying Stream |
379 | * |
380 | * Usage example: |
381 | * \code |
382 | * |
383 | * Stream *fs = Stream::Create("hdfs:///test.txt", "r"); |
384 | * dmlc::istream is(fs); |
385 | * is >> mydata; |
386 | * delete fs; |
387 | * \endcode |
388 | */ |
389 | class istream : public std::basic_istream<char> { |
390 | public: |
391 | /*! |
392 | * \brief construct std::ostream type |
393 | * \param stream the Stream output to be used |
394 | * \param buffer_size internal buffer size |
395 | */ |
396 | explicit istream(Stream *stream, |
397 | size_t buffer_size = (1 << 10)) |
398 | : std::basic_istream<char>(NULL), buf_(buffer_size) { |
399 | this->set_stream(stream); |
400 | } |
401 | virtual ~istream() DMLC_NO_EXCEPTION {} |
402 | /*! |
403 | * \brief set internal stream to be stream, reset states |
404 | * \param stream new stream as output |
405 | */ |
406 | inline void set_stream(Stream *stream) { |
407 | buf_.set_stream(stream); |
408 | this->rdbuf(&buf_); |
409 | } |
410 | /*! \return how many bytes we read so far */ |
411 | inline size_t bytes_read(void) const { |
412 | return buf_.bytes_read(); |
413 | } |
414 | |
415 | private: |
416 | // internal streambuf |
417 | class InBuf : public std::streambuf { |
418 | public: |
419 | explicit InBuf(size_t buffer_size) |
420 | : stream_(NULL), bytes_read_(0), |
421 | buffer_(buffer_size) { |
422 | if (buffer_size == 0) buffer_.resize(2); |
423 | } |
424 | // set stream to the buffer |
425 | inline void set_stream(Stream *stream); |
426 | // return how many bytes read so far |
427 | inline size_t bytes_read(void) const { |
428 | return bytes_read_; |
429 | } |
430 | private: |
431 | /*! \brief internal stream by StreamBuf */ |
432 | Stream *stream_; |
433 | /*! \brief how many bytes we read so far */ |
434 | size_t bytes_read_; |
435 | /*! \brief internal buffer */ |
436 | std::vector<char> buffer_; |
437 | // override underflow |
438 | inline int_type underflow(); |
439 | }; |
440 | /*! \brief input buffer */ |
441 | InBuf buf_; |
442 | }; |
443 | #endif |
444 | } // namespace dmlc |
445 | |
446 | #include "./serializer.h" |
447 | |
448 | namespace dmlc { |
449 | // implementations of inline functions |
450 | template<typename T> |
451 | inline void Stream::Write(const T &data) { |
452 | serializer::Handler<T>::Write(this, data); |
453 | } |
454 | template<typename T> |
455 | inline bool Stream::Read(T *out_data) { |
456 | return serializer::Handler<T>::Read(this, out_data); |
457 | } |
458 | |
459 | template<typename T> |
460 | inline void Stream::WriteArray(const T* data, size_t num_elems) { |
461 | for (size_t i = 0; i < num_elems; ++i) { |
462 | this->Write<T>(data[i]); |
463 | } |
464 | } |
465 | |
466 | template<typename T> |
467 | inline bool Stream::ReadArray(T* data, size_t num_elems) { |
468 | for (size_t i = 0; i < num_elems; ++i) { |
469 | if (!this->Read<T>(data + i)) return false; |
470 | } |
471 | return true; |
472 | } |
473 | |
474 | #ifndef _LIBCPP_SGX_NO_IOSTREAMS |
475 | // implementations for ostream |
476 | inline void ostream::OutBuf::set_stream(Stream *stream) { |
477 | if (stream_ != NULL) this->pubsync(); |
478 | this->stream_ = stream; |
479 | this->setp(&buffer_[0], &buffer_[0] + buffer_.size() - 1); |
480 | } |
481 | inline int ostream::OutBuf::sync(void) { |
482 | if (stream_ == NULL) return -1; |
483 | std::ptrdiff_t n = pptr() - pbase(); |
484 | stream_->Write(pbase(), n); |
485 | this->pbump(-static_cast<int>(n)); |
486 | bytes_out_ += n; |
487 | return 0; |
488 | } |
489 | inline int ostream::OutBuf::overflow(int c) { |
490 | *(this->pptr()) = c; |
491 | std::ptrdiff_t n = pptr() - pbase(); |
492 | this->pbump(-static_cast<int>(n)); |
493 | if (c == EOF) { |
494 | stream_->Write(pbase(), n); |
495 | bytes_out_ += n; |
496 | } else { |
497 | stream_->Write(pbase(), n + 1); |
498 | bytes_out_ += n + 1; |
499 | } |
500 | return c; |
501 | } |
502 | |
503 | // implementations for istream |
504 | inline void istream::InBuf::set_stream(Stream *stream) { |
505 | stream_ = stream; |
506 | this->setg(&buffer_[0], &buffer_[0], &buffer_[0]); |
507 | } |
508 | inline int istream::InBuf::underflow() { |
509 | char *bhead = &buffer_[0]; |
510 | if (this->gptr() == this->egptr()) { |
511 | size_t sz = stream_->Read(bhead, buffer_.size()); |
512 | this->setg(bhead, bhead, bhead + sz); |
513 | bytes_read_ += sz; |
514 | } |
515 | if (this->gptr() == this->egptr()) { |
516 | return traits_type::eof(); |
517 | } else { |
518 | return traits_type::to_int_type(*gptr()); |
519 | } |
520 | } |
521 | #endif |
522 | |
523 | namespace io { |
524 | /*! \brief common data structure for URI */ |
525 | struct URI { |
526 | /*! \brief protocol */ |
527 | std::string protocol; |
528 | /*! |
529 | * \brief host name, namenode for HDFS, bucket name for s3 |
530 | */ |
531 | std::string host; |
532 | /*! \brief name of the path */ |
533 | std::string name; |
534 | /*! \brief enable default constructor */ |
535 | URI(void) {} |
536 | /*! |
537 | * \brief construct from URI string |
538 | */ |
539 | explicit URI(const char *uri) { |
540 | const char *p = std::strstr(uri, "://" ); |
541 | if (p == NULL) { |
542 | name = uri; |
543 | } else { |
544 | protocol = std::string(uri, p - uri + 3); |
545 | uri = p + 3; |
546 | p = std::strchr(uri, '/'); |
547 | if (p == NULL) { |
548 | host = uri; name = '/'; |
549 | } else { |
550 | host = std::string(uri, p - uri); |
551 | name = p; |
552 | } |
553 | } |
554 | } |
555 | /*! \brief string representation */ |
556 | inline std::string str(void) const { |
557 | return protocol + host + name; |
558 | } |
559 | }; |
560 | |
561 | /*! \brief type of file */ |
562 | enum FileType { |
563 | /*! \brief the file is file */ |
564 | kFile, |
565 | /*! \brief the file is directory */ |
566 | kDirectory |
567 | }; |
568 | |
569 | /*! \brief use to store file information */ |
570 | struct FileInfo { |
571 | /*! \brief full path to the file */ |
572 | URI path; |
573 | /*! \brief the size of the file */ |
574 | size_t size; |
575 | /*! \brief the type of the file */ |
576 | FileType type; |
577 | /*! \brief default constructor */ |
578 | FileInfo() : size(0), type(kFile) {} |
579 | }; |
580 | |
581 | /*! \brief file system system interface */ |
582 | class FileSystem { |
583 | public: |
584 | /*! |
585 | * \brief get singleton of filesystem instance according to URI |
586 | * \param path can be s3://..., hdfs://..., file://..., |
587 | * empty string(will return local) |
588 | * \return a corresponding filesystem, report error if |
589 | * we cannot find a matching system |
590 | */ |
591 | static FileSystem *GetInstance(const URI &path); |
592 | /*! \brief virtual destructor */ |
593 | virtual ~FileSystem() {} |
594 | /*! |
595 | * \brief get information about a path |
596 | * \param path the path to the file |
597 | * \return the information about the file |
598 | */ |
599 | virtual FileInfo GetPathInfo(const URI &path) = 0; |
600 | /*! |
601 | * \brief list files in a directory |
602 | * \param path to the file |
603 | * \param out_list the output information about the files |
604 | */ |
605 | virtual void ListDirectory(const URI &path, std::vector<FileInfo> *out_list) = 0; |
606 | /*! |
607 | * \brief list files in a directory recursively using ListDirectory |
608 | * \param path to the file |
609 | * \param out_list the output information about the files |
610 | */ |
611 | virtual void ListDirectoryRecursive(const URI &path, |
612 | std::vector<FileInfo> *out_list); |
613 | /*! |
614 | * \brief open a stream |
615 | * \param path path to file |
616 | * \param flag can be "w", "r", "a |
617 | * \param allow_null whether NULL can be returned, or directly report error |
618 | * \return the created stream, can be NULL when allow_null == true and file do not exist |
619 | */ |
620 | virtual Stream *Open(const URI &path, |
621 | const char* const flag, |
622 | bool allow_null = false) = 0; |
623 | /*! |
624 | * \brief open a seekable stream for read |
625 | * \param path the path to the file |
626 | * \param allow_null whether NULL can be returned, or directly report error |
627 | * \return the created stream, can be NULL when allow_null == true and file do not exist |
628 | */ |
629 | virtual SeekStream *OpenForRead(const URI &path, |
630 | bool allow_null = false) = 0; |
631 | }; |
632 | |
633 | } // namespace io |
634 | } // namespace dmlc |
635 | #endif // DMLC_IO_H_ |
636 | |