1/*!
2 * Copyright (c) 2015 by Contributors
3 * \file io.h
4 * \brief defines serializable interface of dmlc
5 */
6#ifndef DMLC_IO_H_
7#define DMLC_IO_H_
8#include <cstdio>
9#include <string>
10#include <cstring>
11#include <vector>
12#include <istream>
13#include <ostream>
14#include <streambuf>
15#include "./logging.h"
16
17// include uint64_t only to make io standalone
18#ifdef _MSC_VER
19/*! \brief uint64 */
20typedef unsigned __int64 uint64_t;
21#else
22#include <inttypes.h>
23#endif
24
25/*! \brief namespace for dmlc */
26namespace dmlc {
27/*!
28 * \brief interface of stream I/O for serialization
29 */
30class Stream { // NOLINT(*)
31 public:
32 /*!
33 * \brief reads data from a stream
34 * \param ptr pointer to a memory buffer
35 * \param size block size
36 * \return the size of data read
37 */
38 virtual size_t Read(void *ptr, size_t size) = 0;
39 /*!
40 * \brief writes data to a stream
41 * \param ptr pointer to a memory buffer
42 * \param size block size
43 */
44 virtual void Write(const void *ptr, size_t size) = 0;
45 /*! \brief virtual destructor */
46 virtual ~Stream(void) {}
47 /*!
48 * \brief generic factory function
49 * create an stream, the stream will close the underlying files upon deletion
50 *
51 * \param uri the uri of the input currently we support
52 * hdfs://, s3://, and file:// by default file:// will be used
53 * \param flag can be "w", "r", "a"
54 * \param allow_null whether NULL can be returned, or directly report error
55 * \return the created stream, can be NULL when allow_null == true and file do not exist
56 */
57 static Stream *Create(const char *uri,
58 const char* const flag,
59 bool allow_null = false);
60 // helper functions to write/read different data structures
61 /*!
62 * \brief writes a data to stream.
63 *
64 * dmlc::Stream support Write/Read of most STL composites and base types.
65 * If the data type is not supported, a compile time error will be issued.
66 *
67 * This function is endian-aware,
68 * the output endian defined by DMLC_IO_USE_LITTLE_ENDIAN
69 *
70 * \param data data to be written
71 * \tparam T the data type to be written
72 */
73 template<typename T>
74 inline void Write(const T &data);
75 /*!
76 * \brief loads a data from stream.
77 *
78 * dmlc::Stream support Write/Read of most STL composites and base types.
79 * If the data type is not supported, a compile time error will be issued.
80 *
81 * This function is endian-aware,
82 * the input endian defined by DMLC_IO_USE_LITTLE_ENDIAN
83 *
84 * \param out_data place holder of data to be deserialized
85 * \return whether the load was successful
86 */
87 template<typename T>
88 inline bool Read(T *out_data);
89 /*!
90 * \brief Endian aware write array of data.
91 * \param data The data pointer
92 * \param num_elems Number of elements
93 * \tparam T the data type.
94 */
95 template<typename T>
96 inline void WriteArray(const T* data, size_t num_elems);
97 /*!
98 * \brief Endian aware read array of data.
99 * \param data The data pointer
100 * \param num_elems Number of elements
101 * \tparam T the data type.
102 * \return whether the load was successful
103 */
104 template<typename T>
105 inline bool ReadArray(T* data, size_t num_elems);
106};
107
108/*! \brief interface of i/o stream that support seek */
109class SeekStream: public Stream {
110 public:
111 // virtual destructor
112 virtual ~SeekStream(void) {}
113 /*! \brief seek to certain position of the file */
114 virtual void Seek(size_t pos) = 0;
115 /*! \brief tell the position of the stream */
116 virtual size_t Tell(void) = 0;
117 /*!
118 * \brief generic factory function
119 * create an SeekStream for read only,
120 * the stream will close the underlying files upon deletion
121 * error will be reported and the system will exit when create failed
122 * \param uri the uri of the input currently we support
123 * hdfs://, s3://, and file:// by default file:// will be used
124 * \param allow_null whether NULL can be returned, or directly report error
125 * \return the created stream, can be NULL when allow_null == true and file do not exist
126 */
127 static SeekStream *CreateForRead(const char *uri,
128 bool allow_null = false);
129};
130
131/*! \brief interface for serializable objects */
132class Serializable {
133 public:
134 /*! \brief virtual destructor */
135 virtual ~Serializable() {}
136 /*!
137 * \brief load the model from a stream
138 * \param fi stream where to load the model from
139 */
140 virtual void Load(Stream *fi) = 0;
141 /*!
142 * \brief saves the model to a stream
143 * \param fo stream where to save the model to
144 */
145 virtual void Save(Stream *fo) const = 0;
146};
147
148/*!
149 * \brief input split creates that allows reading
150 * of records from split of data,
151 * independent part that covers all the dataset
152 *
153 * see InputSplit::Create for definition of record
154 */
155class InputSplit {
156 public:
157 /*! \brief a blob of memory region */
158 struct Blob {
159 /*! \brief points to start of the memory region */
160 void *dptr;
161 /*! \brief size of the memory region */
162 size_t size;
163 };
164 /*!
165 * \brief hint the inputsplit how large the chunk size
166 * it should return when implementing NextChunk
167 * this is a hint so may not be enforced,
168 * but InputSplit will try adjust its internal buffer
169 * size to the hinted value
170 * \param chunk_size the chunk size
171 */
172 virtual void HintChunkSize(size_t chunk_size) {}
173 /*! \brief get the total size of the InputSplit */
174 virtual size_t GetTotalSize(void) = 0;
175 /*! \brief reset the position of InputSplit to beginning */
176 virtual void BeforeFirst(void) = 0;
177 /*!
178 * \brief get the next record, the returning value
179 * is valid until next call to NextRecord, NextChunk or NextBatch
180 * caller can modify the memory content of out_rec
181 *
182 * For text, out_rec contains a single line
183 * For recordio, out_rec contains one record content(with header striped)
184 *
185 * \param out_rec used to store the result
186 * \return true if we can successfully get next record
187 * false if we reached end of split
188 * \sa InputSplit::Create for definition of record
189 */
190 virtual bool NextRecord(Blob *out_rec) = 0;
191 /*!
192 * \brief get a chunk of memory that can contain multiple records,
193 * the caller needs to parse the content of the resulting chunk,
194 * for text file, out_chunk can contain data of multiple lines
195 * for recordio, out_chunk can contain multiple records(including headers)
196 *
197 * This function ensures there won't be partial record in the chunk
198 * caller can modify the memory content of out_chunk,
199 * the memory is valid until next call to NextRecord, NextChunk or NextBatch
200 *
201 * Usually NextRecord is sufficient, NextChunk can be used by some
202 * multi-threaded parsers to parse the input content
203 *
204 * \param out_chunk used to store the result
205 * \return true if we can successfully get next record
206 * false if we reached end of split
207 * \sa InputSplit::Create for definition of record
208 * \sa RecordIOChunkReader to parse recordio content from out_chunk
209 */
210 virtual bool NextChunk(Blob *out_chunk) = 0;
211 /*!
212 * \brief get a chunk of memory that can contain multiple records,
213 * with hint for how many records is needed,
214 * the caller needs to parse the content of the resulting chunk,
215 * for text file, out_chunk can contain data of multiple lines
216 * for recordio, out_chunk can contain multiple records(including headers)
217 *
218 * This function ensures there won't be partial record in the chunk
219 * caller can modify the memory content of out_chunk,
220 * the memory is valid until next call to NextRecord, NextChunk or NextBatch
221 *
222 *
223 * \param out_chunk used to store the result
224 * \param n_records used as a hint for how many records should be returned, may be ignored
225 * \return true if we can successfully get next record
226 * false if we reached end of split
227 * \sa InputSplit::Create for definition of record
228 * \sa RecordIOChunkReader to parse recordio content from out_chunk
229 */
230 virtual bool NextBatch(Blob *out_chunk, size_t n_records) {
231 return NextChunk(out_chunk);
232 }
233 /*! \brief destructor*/
234 virtual ~InputSplit(void) DMLC_THROW_EXCEPTION {}
235 /*!
236 * \brief reset the Input split to a certain part id,
237 * The InputSplit will be pointed to the head of the new specified segment.
238 * This feature may not be supported by every implementation of InputSplit.
239 * \param part_index The part id of the new input.
240 * \param num_parts The total number of parts.
241 */
242 virtual void ResetPartition(unsigned part_index, unsigned num_parts) = 0;
243 /*!
244 * \brief factory function:
245 * create input split given a uri
246 * \param uri the uri of the input, can contain hdfs prefix
247 * \param part_index the part id of current input
248 * \param num_parts total number of splits
249 * \param type type of record
250 * List of possible types: "text", "recordio", "indexed_recordio"
251 * - "text":
252 * text file, each line is treated as a record
253 * input split will split on '\\n' or '\\r'
254 * - "recordio":
255 * binary recordio file, see recordio.h
256 * - "indexed_recordio":
257 * binary recordio file with index, see recordio.h
258 * \return a new input split
259 * \sa InputSplit::Type
260 */
261 static InputSplit* Create(const char *uri,
262 unsigned part_index,
263 unsigned num_parts,
264 const char *type);
265 /*!
266 * \brief factory function:
267 * create input split given a uri for input and index
268 * \param uri the uri of the input, can contain hdfs prefix
269 * \param index_uri the uri of the index, can contain hdfs prefix
270 * \param part_index the part id of current input
271 * \param num_parts total number of splits
272 * \param type type of record
273 * List of possible types: "text", "recordio", "indexed_recordio"
274 * - "text":
275 * text file, each line is treated as a record
276 * input split will split on '\\n' or '\\r'
277 * - "recordio":
278 * binary recordio file, see recordio.h
279 * - "indexed_recordio":
280 * binary recordio file with index, see recordio.h
281 * \param shuffle whether to shuffle the output from the InputSplit,
282 * supported only by "indexed_recordio" type.
283 * Defaults to "false"
284 * \param seed random seed to use in conjunction with the "shuffle"
285 * option. Defaults to 0
286 * \param batch_size a hint to InputSplit what is the intended number
287 * of examples return per batch. Used only by
288 * "indexed_recordio" type
289 * \param recurse_directories whether to recursively traverse directories
290 * \return a new input split
291 * \sa InputSplit::Type
292 */
293 static InputSplit* Create(const char *uri,
294 const char *index_uri,
295 unsigned part_index,
296 unsigned num_parts,
297 const char *type,
298 const bool shuffle = false,
299 const int seed = 0,
300 const size_t batch_size = 256,
301 const bool recurse_directories = false);
302};
303
304#ifndef _LIBCPP_SGX_NO_IOSTREAMS
305/*!
306 * \brief a std::ostream class that can can wrap Stream objects,
307 * can use ostream with that output to underlying Stream
308 *
309 * Usage example:
310 * \code
311 *
312 * Stream *fs = Stream::Create("hdfs:///test.txt", "w");
313 * dmlc::ostream os(fs);
314 * os << "hello world" << std::endl;
315 * delete fs;
316 * \endcode
317 */
318class ostream : public std::basic_ostream<char> {
319 public:
320 /*!
321 * \brief construct std::ostream type
322 * \param stream the Stream output to be used
323 * \param buffer_size internal streambuf size
324 */
325 explicit ostream(Stream *stream,
326 size_t buffer_size = (1 << 10))
327 : std::basic_ostream<char>(NULL), buf_(buffer_size) {
328 this->set_stream(stream);
329 }
330 // explictly synchronize the buffer
331 virtual ~ostream() DMLC_NO_EXCEPTION {
332 buf_.pubsync();
333 }
334 /*!
335 * \brief set internal stream to be stream, reset states
336 * \param stream new stream as output
337 */
338 inline void set_stream(Stream *stream) {
339 buf_.set_stream(stream);
340 this->rdbuf(&buf_);
341 }
342
343 /*! \return how many bytes we written so far */
344 inline size_t bytes_written(void) const {
345 return buf_.bytes_out();
346 }
347
348 private:
349 // internal streambuf
350 class OutBuf : public std::streambuf {
351 public:
352 explicit OutBuf(size_t buffer_size)
353 : stream_(NULL), buffer_(buffer_size), bytes_out_(0) {
354 if (buffer_size == 0) buffer_.resize(2);
355 }
356 // set stream to the buffer
357 inline void set_stream(Stream *stream);
358
359 inline size_t bytes_out() const { return bytes_out_; }
360 private:
361 /*! \brief internal stream by StreamBuf */
362 Stream *stream_;
363 /*! \brief internal buffer */
364 std::vector<char> buffer_;
365 /*! \brief number of bytes written so far */
366 size_t bytes_out_;
367 // override sync
368 inline int_type sync(void);
369 // override overflow
370 inline int_type overflow(int c);
371 };
372 /*! \brief buffer of the stream */
373 OutBuf buf_;
374};
375
376/*!
377 * \brief a std::istream class that can can wrap Stream objects,
378 * can use istream with that output to underlying Stream
379 *
380 * Usage example:
381 * \code
382 *
383 * Stream *fs = Stream::Create("hdfs:///test.txt", "r");
384 * dmlc::istream is(fs);
385 * is >> mydata;
386 * delete fs;
387 * \endcode
388 */
389class istream : public std::basic_istream<char> {
390 public:
391 /*!
392 * \brief construct std::ostream type
393 * \param stream the Stream output to be used
394 * \param buffer_size internal buffer size
395 */
396 explicit istream(Stream *stream,
397 size_t buffer_size = (1 << 10))
398 : std::basic_istream<char>(NULL), buf_(buffer_size) {
399 this->set_stream(stream);
400 }
401 virtual ~istream() DMLC_NO_EXCEPTION {}
402 /*!
403 * \brief set internal stream to be stream, reset states
404 * \param stream new stream as output
405 */
406 inline void set_stream(Stream *stream) {
407 buf_.set_stream(stream);
408 this->rdbuf(&buf_);
409 }
410 /*! \return how many bytes we read so far */
411 inline size_t bytes_read(void) const {
412 return buf_.bytes_read();
413 }
414
415 private:
416 // internal streambuf
417 class InBuf : public std::streambuf {
418 public:
419 explicit InBuf(size_t buffer_size)
420 : stream_(NULL), bytes_read_(0),
421 buffer_(buffer_size) {
422 if (buffer_size == 0) buffer_.resize(2);
423 }
424 // set stream to the buffer
425 inline void set_stream(Stream *stream);
426 // return how many bytes read so far
427 inline size_t bytes_read(void) const {
428 return bytes_read_;
429 }
430 private:
431 /*! \brief internal stream by StreamBuf */
432 Stream *stream_;
433 /*! \brief how many bytes we read so far */
434 size_t bytes_read_;
435 /*! \brief internal buffer */
436 std::vector<char> buffer_;
437 // override underflow
438 inline int_type underflow();
439 };
440 /*! \brief input buffer */
441 InBuf buf_;
442};
443#endif
444} // namespace dmlc
445
446#include "./serializer.h"
447
448namespace dmlc {
449// implementations of inline functions
450template<typename T>
451inline void Stream::Write(const T &data) {
452 serializer::Handler<T>::Write(this, data);
453}
454template<typename T>
455inline bool Stream::Read(T *out_data) {
456 return serializer::Handler<T>::Read(this, out_data);
457}
458
459template<typename T>
460inline void Stream::WriteArray(const T* data, size_t num_elems) {
461 for (size_t i = 0; i < num_elems; ++i) {
462 this->Write<T>(data[i]);
463 }
464}
465
466template<typename T>
467inline bool Stream::ReadArray(T* data, size_t num_elems) {
468 for (size_t i = 0; i < num_elems; ++i) {
469 if (!this->Read<T>(data + i)) return false;
470 }
471 return true;
472}
473
474#ifndef _LIBCPP_SGX_NO_IOSTREAMS
475// implementations for ostream
476inline void ostream::OutBuf::set_stream(Stream *stream) {
477 if (stream_ != NULL) this->pubsync();
478 this->stream_ = stream;
479 this->setp(&buffer_[0], &buffer_[0] + buffer_.size() - 1);
480}
481inline int ostream::OutBuf::sync(void) {
482 if (stream_ == NULL) return -1;
483 std::ptrdiff_t n = pptr() - pbase();
484 stream_->Write(pbase(), n);
485 this->pbump(-static_cast<int>(n));
486 bytes_out_ += n;
487 return 0;
488}
489inline int ostream::OutBuf::overflow(int c) {
490 *(this->pptr()) = c;
491 std::ptrdiff_t n = pptr() - pbase();
492 this->pbump(-static_cast<int>(n));
493 if (c == EOF) {
494 stream_->Write(pbase(), n);
495 bytes_out_ += n;
496 } else {
497 stream_->Write(pbase(), n + 1);
498 bytes_out_ += n + 1;
499 }
500 return c;
501}
502
503// implementations for istream
504inline void istream::InBuf::set_stream(Stream *stream) {
505 stream_ = stream;
506 this->setg(&buffer_[0], &buffer_[0], &buffer_[0]);
507}
508inline int istream::InBuf::underflow() {
509 char *bhead = &buffer_[0];
510 if (this->gptr() == this->egptr()) {
511 size_t sz = stream_->Read(bhead, buffer_.size());
512 this->setg(bhead, bhead, bhead + sz);
513 bytes_read_ += sz;
514 }
515 if (this->gptr() == this->egptr()) {
516 return traits_type::eof();
517 } else {
518 return traits_type::to_int_type(*gptr());
519 }
520}
521#endif
522
523namespace io {
524/*! \brief common data structure for URI */
525struct URI {
526 /*! \brief protocol */
527 std::string protocol;
528 /*!
529 * \brief host name, namenode for HDFS, bucket name for s3
530 */
531 std::string host;
532 /*! \brief name of the path */
533 std::string name;
534 /*! \brief enable default constructor */
535 URI(void) {}
536 /*!
537 * \brief construct from URI string
538 */
539 explicit URI(const char *uri) {
540 const char *p = std::strstr(uri, "://");
541 if (p == NULL) {
542 name = uri;
543 } else {
544 protocol = std::string(uri, p - uri + 3);
545 uri = p + 3;
546 p = std::strchr(uri, '/');
547 if (p == NULL) {
548 host = uri; name = '/';
549 } else {
550 host = std::string(uri, p - uri);
551 name = p;
552 }
553 }
554 }
555 /*! \brief string representation */
556 inline std::string str(void) const {
557 return protocol + host + name;
558 }
559};
560
561/*! \brief type of file */
562enum FileType {
563 /*! \brief the file is file */
564 kFile,
565 /*! \brief the file is directory */
566 kDirectory
567};
568
569/*! \brief use to store file information */
570struct FileInfo {
571 /*! \brief full path to the file */
572 URI path;
573 /*! \brief the size of the file */
574 size_t size;
575 /*! \brief the type of the file */
576 FileType type;
577 /*! \brief default constructor */
578 FileInfo() : size(0), type(kFile) {}
579};
580
581/*! \brief file system system interface */
582class FileSystem {
583 public:
584 /*!
585 * \brief get singleton of filesystem instance according to URI
586 * \param path can be s3://..., hdfs://..., file://...,
587 * empty string(will return local)
588 * \return a corresponding filesystem, report error if
589 * we cannot find a matching system
590 */
591 static FileSystem *GetInstance(const URI &path);
592 /*! \brief virtual destructor */
593 virtual ~FileSystem() {}
594 /*!
595 * \brief get information about a path
596 * \param path the path to the file
597 * \return the information about the file
598 */
599 virtual FileInfo GetPathInfo(const URI &path) = 0;
600 /*!
601 * \brief list files in a directory
602 * \param path to the file
603 * \param out_list the output information about the files
604 */
605 virtual void ListDirectory(const URI &path, std::vector<FileInfo> *out_list) = 0;
606 /*!
607 * \brief list files in a directory recursively using ListDirectory
608 * \param path to the file
609 * \param out_list the output information about the files
610 */
611 virtual void ListDirectoryRecursive(const URI &path,
612 std::vector<FileInfo> *out_list);
613 /*!
614 * \brief open a stream
615 * \param path path to file
616 * \param flag can be "w", "r", "a
617 * \param allow_null whether NULL can be returned, or directly report error
618 * \return the created stream, can be NULL when allow_null == true and file do not exist
619 */
620 virtual Stream *Open(const URI &path,
621 const char* const flag,
622 bool allow_null = false) = 0;
623 /*!
624 * \brief open a seekable stream for read
625 * \param path the path to the file
626 * \param allow_null whether NULL can be returned, or directly report error
627 * \return the created stream, can be NULL when allow_null == true and file do not exist
628 */
629 virtual SeekStream *OpenForRead(const URI &path,
630 bool allow_null = false) = 0;
631};
632
633} // namespace io
634} // namespace dmlc
635#endif // DMLC_IO_H_
636