1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "internal.h"
24
25#include <stdio.h>
26#include <stdlib.h>
27#include <string.h>
28#include <assert.h>
29#include <errno.h>
30
31#include <sys/types.h>
32#include <sys/socket.h>
33#include <sys/uio.h>
34#include <sys/un.h>
35#include <unistd.h>
36#include <limits.h> /* IOV_MAX */
37
38#if defined(__APPLE__)
39# include <sys/event.h>
40# include <sys/time.h>
41# include <sys/select.h>
42
43/* Forward declaration */
44typedef struct uv__stream_select_s uv__stream_select_t;
45
46struct uv__stream_select_s {
47 uv_stream_t* stream;
48 uv_thread_t thread;
49 uv_sem_t close_sem;
50 uv_sem_t async_sem;
51 uv_async_t async;
52 int events;
53 int fake_fd;
54 int int_fd;
55 int fd;
56 fd_set* sread;
57 size_t sread_sz;
58 fd_set* swrite;
59 size_t swrite_sz;
60};
61
62/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
63 * EPROTOTYPE can be returned while trying to write to a socket that is
64 * shutting down. If we retry the write, we should get the expected EPIPE
65 * instead.
66 */
67# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
68# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
69 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
70 (errno == EMSGSIZE && send_handle != NULL))
71#else
72# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
73# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
74 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
75#endif /* defined(__APPLE__) */
76
77static void uv__stream_connect(uv_stream_t*);
78static void uv__write(uv_stream_t* stream);
79static void uv__read(uv_stream_t* stream);
80static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
81static void uv__write_callbacks(uv_stream_t* stream);
82static size_t uv__write_req_size(uv_write_t* req);
83
84
85void uv__stream_init(uv_loop_t* loop,
86 uv_stream_t* stream,
87 uv_handle_type type) {
88 int err;
89
90 uv__handle_init(loop, (uv_handle_t*)stream, type);
91 stream->read_cb = NULL;
92 stream->alloc_cb = NULL;
93 stream->close_cb = NULL;
94 stream->connection_cb = NULL;
95 stream->connect_req = NULL;
96 stream->shutdown_req = NULL;
97 stream->accepted_fd = -1;
98 stream->queued_fds = NULL;
99 stream->delayed_error = 0;
100 QUEUE_INIT(&stream->write_queue);
101 QUEUE_INIT(&stream->write_completed_queue);
102 stream->write_queue_size = 0;
103
104 if (loop->emfile_fd == -1) {
105 err = uv__open_cloexec("/dev/null", O_RDONLY);
106 if (err < 0)
107 /* In the rare case that "/dev/null" isn't mounted open "/"
108 * instead.
109 */
110 err = uv__open_cloexec("/", O_RDONLY);
111 if (err >= 0)
112 loop->emfile_fd = err;
113 }
114
115#if defined(__APPLE__)
116 stream->select = NULL;
117#endif /* defined(__APPLE_) */
118
119 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
120}
121
122
123static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
124#if defined(__APPLE__)
125 /* Notify select() thread about state change */
126 uv__stream_select_t* s;
127 int r;
128
129 s = stream->select;
130 if (s == NULL)
131 return;
132
133 /* Interrupt select() loop
134 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
135 * emit read event on other side
136 */
137 do
138 r = write(s->fake_fd, "x", 1);
139 while (r == -1 && errno == EINTR);
140
141 assert(r == 1);
142#else /* !defined(__APPLE__) */
143 /* No-op on any other platform */
144#endif /* !defined(__APPLE__) */
145}
146
147
148#if defined(__APPLE__)
149static void uv__stream_osx_select(void* arg) {
150 uv_stream_t* stream;
151 uv__stream_select_t* s;
152 char buf[1024];
153 int events;
154 int fd;
155 int r;
156 int max_fd;
157
158 stream = arg;
159 s = stream->select;
160 fd = s->fd;
161
162 if (fd > s->int_fd)
163 max_fd = fd;
164 else
165 max_fd = s->int_fd;
166
167 while (1) {
168 /* Terminate on semaphore */
169 if (uv_sem_trywait(&s->close_sem) == 0)
170 break;
171
172 /* Watch fd using select(2) */
173 memset(s->sread, 0, s->sread_sz);
174 memset(s->swrite, 0, s->swrite_sz);
175
176 if (uv__io_active(&stream->io_watcher, POLLIN))
177 FD_SET(fd, s->sread);
178 if (uv__io_active(&stream->io_watcher, POLLOUT))
179 FD_SET(fd, s->swrite);
180 FD_SET(s->int_fd, s->sread);
181
182 /* Wait indefinitely for fd events */
183 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
184 if (r == -1) {
185 if (errno == EINTR)
186 continue;
187
188 /* XXX: Possible?! */
189 abort();
190 }
191
192 /* Ignore timeouts */
193 if (r == 0)
194 continue;
195
196 /* Empty socketpair's buffer in case of interruption */
197 if (FD_ISSET(s->int_fd, s->sread))
198 while (1) {
199 r = read(s->int_fd, buf, sizeof(buf));
200
201 if (r == sizeof(buf))
202 continue;
203
204 if (r != -1)
205 break;
206
207 if (errno == EAGAIN || errno == EWOULDBLOCK)
208 break;
209
210 if (errno == EINTR)
211 continue;
212
213 abort();
214 }
215
216 /* Handle events */
217 events = 0;
218 if (FD_ISSET(fd, s->sread))
219 events |= POLLIN;
220 if (FD_ISSET(fd, s->swrite))
221 events |= POLLOUT;
222
223 assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
224 if (events != 0) {
225 ACCESS_ONCE(int, s->events) = events;
226
227 uv_async_send(&s->async);
228 uv_sem_wait(&s->async_sem);
229
230 /* Should be processed at this stage */
231 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
232 }
233 }
234}
235
236
237static void uv__stream_osx_select_cb(uv_async_t* handle) {
238 uv__stream_select_t* s;
239 uv_stream_t* stream;
240 int events;
241
242 s = container_of(handle, uv__stream_select_t, async);
243 stream = s->stream;
244
245 /* Get and reset stream's events */
246 events = s->events;
247 ACCESS_ONCE(int, s->events) = 0;
248
249 assert(events != 0);
250 assert(events == (events & (POLLIN | POLLOUT)));
251
252 /* Invoke callback on event-loop */
253 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
254 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
255
256 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
257 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
258
259 if (stream->flags & UV_HANDLE_CLOSING)
260 return;
261
262 /* NOTE: It is important to do it here, otherwise `select()` might be called
263 * before the actual `uv__read()`, leading to the blocking syscall
264 */
265 uv_sem_post(&s->async_sem);
266}
267
268
269static void uv__stream_osx_cb_close(uv_handle_t* async) {
270 uv__stream_select_t* s;
271
272 s = container_of(async, uv__stream_select_t, async);
273 uv__free(s);
274}
275
276
277int uv__stream_try_select(uv_stream_t* stream, int* fd) {
278 /*
279 * kqueue doesn't work with some files from /dev mount on osx.
280 * select(2) in separate thread for those fds
281 */
282
283 struct kevent filter[1];
284 struct kevent events[1];
285 struct timespec timeout;
286 uv__stream_select_t* s;
287 int fds[2];
288 int err;
289 int ret;
290 int kq;
291 int old_fd;
292 int max_fd;
293 size_t sread_sz;
294 size_t swrite_sz;
295
296 kq = kqueue();
297 if (kq == -1) {
298 perror("(libuv) kqueue()");
299 return UV__ERR(errno);
300 }
301
302 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
303
304 /* Use small timeout, because we only want to capture EINVALs */
305 timeout.tv_sec = 0;
306 timeout.tv_nsec = 1;
307
308 do
309 ret = kevent(kq, filter, 1, events, 1, &timeout);
310 while (ret == -1 && errno == EINTR);
311
312 uv__close(kq);
313
314 if (ret == -1)
315 return UV__ERR(errno);
316
317 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
318 return 0;
319
320 /* At this point we definitely know that this fd won't work with kqueue */
321
322 /*
323 * Create fds for io watcher and to interrupt the select() loop.
324 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
325 */
326 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
327 return UV__ERR(errno);
328
329 max_fd = *fd;
330 if (fds[1] > max_fd)
331 max_fd = fds[1];
332
333 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
334 swrite_sz = sread_sz;
335
336 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
337 if (s == NULL) {
338 err = UV_ENOMEM;
339 goto failed_malloc;
340 }
341
342 s->events = 0;
343 s->fd = *fd;
344 s->sread = (fd_set*) ((char*) s + sizeof(*s));
345 s->sread_sz = sread_sz;
346 s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
347 s->swrite_sz = swrite_sz;
348
349 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
350 if (err)
351 goto failed_async_init;
352
353 s->async.flags |= UV_HANDLE_INTERNAL;
354 uv__handle_unref(&s->async);
355
356 err = uv_sem_init(&s->close_sem, 0);
357 if (err != 0)
358 goto failed_close_sem_init;
359
360 err = uv_sem_init(&s->async_sem, 0);
361 if (err != 0)
362 goto failed_async_sem_init;
363
364 s->fake_fd = fds[0];
365 s->int_fd = fds[1];
366
367 old_fd = *fd;
368 s->stream = stream;
369 stream->select = s;
370 *fd = s->fake_fd;
371
372 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
373 if (err != 0)
374 goto failed_thread_create;
375
376 return 0;
377
378failed_thread_create:
379 s->stream = NULL;
380 stream->select = NULL;
381 *fd = old_fd;
382
383 uv_sem_destroy(&s->async_sem);
384
385failed_async_sem_init:
386 uv_sem_destroy(&s->close_sem);
387
388failed_close_sem_init:
389 uv__close(fds[0]);
390 uv__close(fds[1]);
391 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
392 return err;
393
394failed_async_init:
395 uv__free(s);
396
397failed_malloc:
398 uv__close(fds[0]);
399 uv__close(fds[1]);
400
401 return err;
402}
403#endif /* defined(__APPLE__) */
404
405
406int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
407#if defined(__APPLE__)
408 int enable;
409#endif
410
411 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
412 return UV_EBUSY;
413
414 assert(fd >= 0);
415 stream->flags |= flags;
416
417 if (stream->type == UV_TCP) {
418 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
419 return UV__ERR(errno);
420
421 /* TODO Use delay the user passed in. */
422 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
423 uv__tcp_keepalive(fd, 1, 60)) {
424 return UV__ERR(errno);
425 }
426 }
427
428#if defined(__APPLE__)
429 enable = 1;
430 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
431 errno != ENOTSOCK &&
432 errno != EINVAL) {
433 return UV__ERR(errno);
434 }
435#endif
436
437 stream->io_watcher.fd = fd;
438
439 return 0;
440}
441
442
443void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
444 uv_write_t* req;
445 QUEUE* q;
446 while (!QUEUE_EMPTY(&stream->write_queue)) {
447 q = QUEUE_HEAD(&stream->write_queue);
448 QUEUE_REMOVE(q);
449
450 req = QUEUE_DATA(q, uv_write_t, queue);
451 req->error = error;
452
453 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
454 }
455}
456
457
458void uv__stream_destroy(uv_stream_t* stream) {
459 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
460 assert(stream->flags & UV_HANDLE_CLOSED);
461
462 if (stream->connect_req) {
463 uv__req_unregister(stream->loop, stream->connect_req);
464 stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
465 stream->connect_req = NULL;
466 }
467
468 uv__stream_flush_write_queue(stream, UV_ECANCELED);
469 uv__write_callbacks(stream);
470
471 if (stream->shutdown_req) {
472 /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
473 * fait accompli at this point. Maybe we should revisit this in v0.11.
474 * A possible reason for leaving it unchanged is that it informs the
475 * callee that the handle has been destroyed.
476 */
477 uv__req_unregister(stream->loop, stream->shutdown_req);
478 stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
479 stream->shutdown_req = NULL;
480 }
481
482 assert(stream->write_queue_size == 0);
483}
484
485
486/* Implements a best effort approach to mitigating accept() EMFILE errors.
487 * We have a spare file descriptor stashed away that we close to get below
488 * the EMFILE limit. Next, we accept all pending connections and close them
489 * immediately to signal the clients that we're overloaded - and we are, but
490 * we still keep on trucking.
491 *
492 * There is one caveat: it's not reliable in a multi-threaded environment.
493 * The file descriptor limit is per process. Our party trick fails if another
494 * thread opens a file or creates a socket in the time window between us
495 * calling close() and accept().
496 */
497static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
498 int err;
499 int emfile_fd;
500
501 if (loop->emfile_fd == -1)
502 return UV_EMFILE;
503
504 uv__close(loop->emfile_fd);
505 loop->emfile_fd = -1;
506
507 do {
508 err = uv__accept(accept_fd);
509 if (err >= 0)
510 uv__close(err);
511 } while (err >= 0 || err == UV_EINTR);
512
513 emfile_fd = uv__open_cloexec("/", O_RDONLY);
514 if (emfile_fd >= 0)
515 loop->emfile_fd = emfile_fd;
516
517 return err;
518}
519
520
521#if defined(UV_HAVE_KQUEUE)
522# define UV_DEC_BACKLOG(w) w->rcount--;
523#else
524# define UV_DEC_BACKLOG(w) /* no-op */
525#endif /* defined(UV_HAVE_KQUEUE) */
526
527
528void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
529 uv_stream_t* stream;
530 int err;
531
532 stream = container_of(w, uv_stream_t, io_watcher);
533 assert(events & POLLIN);
534 assert(stream->accepted_fd == -1);
535 assert(!(stream->flags & UV_HANDLE_CLOSING));
536
537 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
538
539 /* connection_cb can close the server socket while we're
540 * in the loop so check it on each iteration.
541 */
542 while (uv__stream_fd(stream) != -1) {
543 assert(stream->accepted_fd == -1);
544
545#if defined(UV_HAVE_KQUEUE)
546 if (w->rcount <= 0)
547 return;
548#endif /* defined(UV_HAVE_KQUEUE) */
549
550 err = uv__accept(uv__stream_fd(stream));
551 if (err < 0) {
552 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
553 return; /* Not an error. */
554
555 if (err == UV_ECONNABORTED)
556 continue; /* Ignore. Nothing we can do about that. */
557
558 if (err == UV_EMFILE || err == UV_ENFILE) {
559 err = uv__emfile_trick(loop, uv__stream_fd(stream));
560 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
561 break;
562 }
563
564 stream->connection_cb(stream, err);
565 continue;
566 }
567
568 UV_DEC_BACKLOG(w)
569 stream->accepted_fd = err;
570 stream->connection_cb(stream, 0);
571
572 if (stream->accepted_fd != -1) {
573 /* The user hasn't yet accepted called uv_accept() */
574 uv__io_stop(loop, &stream->io_watcher, POLLIN);
575 return;
576 }
577
578 if (stream->type == UV_TCP &&
579 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
580 /* Give other processes a chance to accept connections. */
581 struct timespec timeout = { 0, 1 };
582 nanosleep(&timeout, NULL);
583 }
584 }
585}
586
587
588#undef UV_DEC_BACKLOG
589
590
591int uv_accept(uv_stream_t* server, uv_stream_t* client) {
592 int err;
593
594 assert(server->loop == client->loop);
595
596 if (server->accepted_fd == -1)
597 return UV_EAGAIN;
598
599 switch (client->type) {
600 case UV_NAMED_PIPE:
601 case UV_TCP:
602 err = uv__stream_open(client,
603 server->accepted_fd,
604 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
605 if (err) {
606 /* TODO handle error */
607 uv__close(server->accepted_fd);
608 goto done;
609 }
610 break;
611
612 case UV_UDP:
613 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
614 if (err) {
615 uv__close(server->accepted_fd);
616 goto done;
617 }
618 break;
619
620 default:
621 return UV_EINVAL;
622 }
623
624 client->flags |= UV_HANDLE_BOUND;
625
626done:
627 /* Process queued fds */
628 if (server->queued_fds != NULL) {
629 uv__stream_queued_fds_t* queued_fds;
630
631 queued_fds = server->queued_fds;
632
633 /* Read first */
634 server->accepted_fd = queued_fds->fds[0];
635
636 /* All read, free */
637 assert(queued_fds->offset > 0);
638 if (--queued_fds->offset == 0) {
639 uv__free(queued_fds);
640 server->queued_fds = NULL;
641 } else {
642 /* Shift rest */
643 memmove(queued_fds->fds,
644 queued_fds->fds + 1,
645 queued_fds->offset * sizeof(*queued_fds->fds));
646 }
647 } else {
648 server->accepted_fd = -1;
649 if (err == 0)
650 uv__io_start(server->loop, &server->io_watcher, POLLIN);
651 }
652 return err;
653}
654
655
656int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
657 int err;
658
659 switch (stream->type) {
660 case UV_TCP:
661 err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
662 break;
663
664 case UV_NAMED_PIPE:
665 err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
666 break;
667
668 default:
669 err = UV_EINVAL;
670 }
671
672 if (err == 0)
673 uv__handle_start(stream);
674
675 return err;
676}
677
678
679static void uv__drain(uv_stream_t* stream) {
680 uv_shutdown_t* req;
681 int err;
682
683 assert(QUEUE_EMPTY(&stream->write_queue));
684 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
685 uv__stream_osx_interrupt_select(stream);
686
687 /* Shutdown? */
688 if ((stream->flags & UV_HANDLE_SHUTTING) &&
689 !(stream->flags & UV_HANDLE_CLOSING) &&
690 !(stream->flags & UV_HANDLE_SHUT)) {
691 assert(stream->shutdown_req);
692
693 req = stream->shutdown_req;
694 stream->shutdown_req = NULL;
695 stream->flags &= ~UV_HANDLE_SHUTTING;
696 uv__req_unregister(stream->loop, req);
697
698 err = 0;
699 if (shutdown(uv__stream_fd(stream), SHUT_WR))
700 err = UV__ERR(errno);
701
702 if (err == 0)
703 stream->flags |= UV_HANDLE_SHUT;
704
705 if (req->cb != NULL)
706 req->cb(req, err);
707 }
708}
709
710
711static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
712 if (n == 1)
713 return write(fd, vec->iov_base, vec->iov_len);
714 else
715 return writev(fd, vec, n);
716}
717
718
719static size_t uv__write_req_size(uv_write_t* req) {
720 size_t size;
721
722 assert(req->bufs != NULL);
723 size = uv__count_bufs(req->bufs + req->write_index,
724 req->nbufs - req->write_index);
725 assert(req->handle->write_queue_size >= size);
726
727 return size;
728}
729
730
731/* Returns 1 if all write request data has been written, or 0 if there is still
732 * more data to write.
733 *
734 * Note: the return value only says something about the *current* request.
735 * There may still be other write requests sitting in the queue.
736 */
737static int uv__write_req_update(uv_stream_t* stream,
738 uv_write_t* req,
739 size_t n) {
740 uv_buf_t* buf;
741 size_t len;
742
743 assert(n <= stream->write_queue_size);
744 stream->write_queue_size -= n;
745
746 buf = req->bufs + req->write_index;
747
748 do {
749 len = n < buf->len ? n : buf->len;
750 buf->base += len;
751 buf->len -= len;
752 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
753 n -= len;
754 } while (n > 0);
755
756 req->write_index = buf - req->bufs;
757
758 return req->write_index == req->nbufs;
759}
760
761
762static void uv__write_req_finish(uv_write_t* req) {
763 uv_stream_t* stream = req->handle;
764
765 /* Pop the req off tcp->write_queue. */
766 QUEUE_REMOVE(&req->queue);
767
768 /* Only free when there was no error. On error, we touch up write_queue_size
769 * right before making the callback. The reason we don't do that right away
770 * is that a write_queue_size > 0 is our only way to signal to the user that
771 * they should stop writing - which they should if we got an error. Something
772 * to revisit in future revisions of the libuv API.
773 */
774 if (req->error == 0) {
775 if (req->bufs != req->bufsml)
776 uv__free(req->bufs);
777 req->bufs = NULL;
778 }
779
780 /* Add it to the write_completed_queue where it will have its
781 * callback called in the near future.
782 */
783 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
784 uv__io_feed(stream->loop, &stream->io_watcher);
785}
786
787
788static int uv__handle_fd(uv_handle_t* handle) {
789 switch (handle->type) {
790 case UV_NAMED_PIPE:
791 case UV_TCP:
792 return ((uv_stream_t*) handle)->io_watcher.fd;
793
794 case UV_UDP:
795 return ((uv_udp_t*) handle)->io_watcher.fd;
796
797 default:
798 return -1;
799 }
800}
801
802static void uv__write(uv_stream_t* stream) {
803 struct iovec* iov;
804 QUEUE* q;
805 uv_write_t* req;
806 int iovmax;
807 int iovcnt;
808 ssize_t n;
809 int err;
810
811start:
812
813 assert(uv__stream_fd(stream) >= 0);
814
815 if (QUEUE_EMPTY(&stream->write_queue))
816 return;
817
818 q = QUEUE_HEAD(&stream->write_queue);
819 req = QUEUE_DATA(q, uv_write_t, queue);
820 assert(req->handle == stream);
821
822 /*
823 * Cast to iovec. We had to have our own uv_buf_t instead of iovec
824 * because Windows's WSABUF is not an iovec.
825 */
826 assert(sizeof(uv_buf_t) == sizeof(struct iovec));
827 iov = (struct iovec*) &(req->bufs[req->write_index]);
828 iovcnt = req->nbufs - req->write_index;
829
830 iovmax = uv__getiovmax();
831
832 /* Limit iov count to avoid EINVALs from writev() */
833 if (iovcnt > iovmax)
834 iovcnt = iovmax;
835
836 /*
837 * Now do the actual writev. Note that we've been updating the pointers
838 * inside the iov each time we write. So there is no need to offset it.
839 */
840
841 if (req->send_handle) {
842 int fd_to_send;
843 struct msghdr msg;
844 struct cmsghdr *cmsg;
845 union {
846 char data[64];
847 struct cmsghdr alias;
848 } scratch;
849
850 if (uv__is_closing(req->send_handle)) {
851 err = UV_EBADF;
852 goto error;
853 }
854
855 fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
856
857 memset(&scratch, 0, sizeof(scratch));
858
859 assert(fd_to_send >= 0);
860
861 msg.msg_name = NULL;
862 msg.msg_namelen = 0;
863 msg.msg_iov = iov;
864 msg.msg_iovlen = iovcnt;
865 msg.msg_flags = 0;
866
867 msg.msg_control = &scratch.alias;
868 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
869
870 cmsg = CMSG_FIRSTHDR(&msg);
871 cmsg->cmsg_level = SOL_SOCKET;
872 cmsg->cmsg_type = SCM_RIGHTS;
873 cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
874
875 /* silence aliasing warning */
876 {
877 void* pv = CMSG_DATA(cmsg);
878 int* pi = pv;
879 *pi = fd_to_send;
880 }
881
882 do
883 n = sendmsg(uv__stream_fd(stream), &msg, 0);
884 while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
885
886 /* Ensure the handle isn't sent again in case this is a partial write. */
887 if (n >= 0)
888 req->send_handle = NULL;
889 } else {
890 do
891 n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
892 while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
893 }
894
895 if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
896 err = UV__ERR(errno);
897 goto error;
898 }
899
900 if (n >= 0 && uv__write_req_update(stream, req, n)) {
901 uv__write_req_finish(req);
902 return; /* TODO(bnoordhuis) Start trying to write the next request. */
903 }
904
905 /* If this is a blocking stream, try again. */
906 if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
907 goto start;
908
909 /* We're not done. */
910 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
911
912 /* Notify select() thread about state change */
913 uv__stream_osx_interrupt_select(stream);
914
915 return;
916
917error:
918 req->error = err;
919 uv__write_req_finish(req);
920 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
921 if (!uv__io_active(&stream->io_watcher, POLLIN))
922 uv__handle_stop(stream);
923 uv__stream_osx_interrupt_select(stream);
924}
925
926
927static void uv__write_callbacks(uv_stream_t* stream) {
928 uv_write_t* req;
929 QUEUE* q;
930 QUEUE pq;
931
932 if (QUEUE_EMPTY(&stream->write_completed_queue))
933 return;
934
935 QUEUE_MOVE(&stream->write_completed_queue, &pq);
936
937 while (!QUEUE_EMPTY(&pq)) {
938 /* Pop a req off write_completed_queue. */
939 q = QUEUE_HEAD(&pq);
940 req = QUEUE_DATA(q, uv_write_t, queue);
941 QUEUE_REMOVE(q);
942 uv__req_unregister(stream->loop, req);
943
944 if (req->bufs != NULL) {
945 stream->write_queue_size -= uv__write_req_size(req);
946 if (req->bufs != req->bufsml)
947 uv__free(req->bufs);
948 req->bufs = NULL;
949 }
950
951 /* NOTE: call callback AFTER freeing the request data. */
952 if (req->cb)
953 req->cb(req, req->error);
954 }
955}
956
957
958uv_handle_type uv__handle_type(int fd) {
959 struct sockaddr_storage ss;
960 socklen_t sslen;
961 socklen_t len;
962 int type;
963
964 memset(&ss, 0, sizeof(ss));
965 sslen = sizeof(ss);
966
967 if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
968 return UV_UNKNOWN_HANDLE;
969
970 len = sizeof type;
971
972 if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
973 return UV_UNKNOWN_HANDLE;
974
975 if (type == SOCK_STREAM) {
976#if defined(_AIX) || defined(__DragonFly__)
977 /* on AIX/DragonFly the getsockname call returns an empty sa structure
978 * for sockets of type AF_UNIX. For all other types it will
979 * return a properly filled in structure.
980 */
981 if (sslen == 0)
982 return UV_NAMED_PIPE;
983#endif
984 switch (ss.ss_family) {
985 case AF_UNIX:
986 return UV_NAMED_PIPE;
987 case AF_INET:
988 case AF_INET6:
989 return UV_TCP;
990 }
991 }
992
993 if (type == SOCK_DGRAM &&
994 (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
995 return UV_UDP;
996
997 return UV_UNKNOWN_HANDLE;
998}
999
1000
1001static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
1002 stream->flags |= UV_HANDLE_READ_EOF;
1003 stream->flags &= ~UV_HANDLE_READING;
1004 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1005 if (!uv__io_active(&stream->io_watcher, POLLOUT))
1006 uv__handle_stop(stream);
1007 uv__stream_osx_interrupt_select(stream);
1008 stream->read_cb(stream, UV_EOF, buf);
1009}
1010
1011
1012static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
1013 uv__stream_queued_fds_t* queued_fds;
1014 unsigned int queue_size;
1015
1016 queued_fds = stream->queued_fds;
1017 if (queued_fds == NULL) {
1018 queue_size = 8;
1019 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
1020 sizeof(*queued_fds));
1021 if (queued_fds == NULL)
1022 return UV_ENOMEM;
1023 queued_fds->size = queue_size;
1024 queued_fds->offset = 0;
1025 stream->queued_fds = queued_fds;
1026
1027 /* Grow */
1028 } else if (queued_fds->size == queued_fds->offset) {
1029 queue_size = queued_fds->size + 8;
1030 queued_fds = uv__realloc(queued_fds,
1031 (queue_size - 1) * sizeof(*queued_fds->fds) +
1032 sizeof(*queued_fds));
1033
1034 /*
1035 * Allocation failure, report back.
1036 * NOTE: if it is fatal - sockets will be closed in uv__stream_close
1037 */
1038 if (queued_fds == NULL)
1039 return UV_ENOMEM;
1040 queued_fds->size = queue_size;
1041 stream->queued_fds = queued_fds;
1042 }
1043
1044 /* Put fd in a queue */
1045 queued_fds->fds[queued_fds->offset++] = fd;
1046
1047 return 0;
1048}
1049
1050
1051#define UV__CMSG_FD_COUNT 64
1052#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1053
1054
1055static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1056 struct cmsghdr* cmsg;
1057
1058 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1059 char* start;
1060 char* end;
1061 int err;
1062 void* pv;
1063 int* pi;
1064 unsigned int i;
1065 unsigned int count;
1066
1067 if (cmsg->cmsg_type != SCM_RIGHTS) {
1068 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1069 cmsg->cmsg_type);
1070 continue;
1071 }
1072
1073 /* silence aliasing warning */
1074 pv = CMSG_DATA(cmsg);
1075 pi = pv;
1076
1077 /* Count available fds */
1078 start = (char*) cmsg;
1079 end = (char*) cmsg + cmsg->cmsg_len;
1080 count = 0;
1081 while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1082 count++;
1083 assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1084
1085 for (i = 0; i < count; i++) {
1086 /* Already has accepted fd, queue now */
1087 if (stream->accepted_fd != -1) {
1088 err = uv__stream_queue_fd(stream, pi[i]);
1089 if (err != 0) {
1090 /* Close rest */
1091 for (; i < count; i++)
1092 uv__close(pi[i]);
1093 return err;
1094 }
1095 } else {
1096 stream->accepted_fd = pi[i];
1097 }
1098 }
1099 }
1100
1101 return 0;
1102}
1103
1104
1105#ifdef __clang__
1106# pragma clang diagnostic push
1107# pragma clang diagnostic ignored "-Wgnu-folding-constant"
1108# pragma clang diagnostic ignored "-Wvla-extension"
1109#endif
1110
1111static void uv__read(uv_stream_t* stream) {
1112 uv_buf_t buf;
1113 ssize_t nread;
1114 struct msghdr msg;
1115 char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1116 int count;
1117 int err;
1118 int is_ipc;
1119
1120 stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1121
1122 /* Prevent loop starvation when the data comes in as fast as (or faster than)
1123 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1124 */
1125 count = 32;
1126
1127 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1128
1129 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1130 * tcp->read_cb is NULL or not?
1131 */
1132 while (stream->read_cb
1133 && (stream->flags & UV_HANDLE_READING)
1134 && (count-- > 0)) {
1135 assert(stream->alloc_cb != NULL);
1136
1137 buf = uv_buf_init(NULL, 0);
1138 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1139 if (buf.base == NULL || buf.len == 0) {
1140 /* User indicates it can't or won't handle the read. */
1141 stream->read_cb(stream, UV_ENOBUFS, &buf);
1142 return;
1143 }
1144
1145 assert(buf.base != NULL);
1146 assert(uv__stream_fd(stream) >= 0);
1147
1148 if (!is_ipc) {
1149 do {
1150 nread = read(uv__stream_fd(stream), buf.base, buf.len);
1151 }
1152 while (nread < 0 && errno == EINTR);
1153 } else {
1154 /* ipc uses recvmsg */
1155 msg.msg_flags = 0;
1156 msg.msg_iov = (struct iovec*) &buf;
1157 msg.msg_iovlen = 1;
1158 msg.msg_name = NULL;
1159 msg.msg_namelen = 0;
1160 /* Set up to receive a descriptor even if one isn't in the message */
1161 msg.msg_controllen = sizeof(cmsg_space);
1162 msg.msg_control = cmsg_space;
1163
1164 do {
1165 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1166 }
1167 while (nread < 0 && errno == EINTR);
1168 }
1169
1170 if (nread < 0) {
1171 /* Error */
1172 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1173 /* Wait for the next one. */
1174 if (stream->flags & UV_HANDLE_READING) {
1175 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1176 uv__stream_osx_interrupt_select(stream);
1177 }
1178 stream->read_cb(stream, 0, &buf);
1179#if defined(__CYGWIN__) || defined(__MSYS__)
1180 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1181 uv__stream_eof(stream, &buf);
1182 return;
1183#endif
1184 } else {
1185 /* Error. User should call uv_close(). */
1186 stream->read_cb(stream, UV__ERR(errno), &buf);
1187 if (stream->flags & UV_HANDLE_READING) {
1188 stream->flags &= ~UV_HANDLE_READING;
1189 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1190 if (!uv__io_active(&stream->io_watcher, POLLOUT))
1191 uv__handle_stop(stream);
1192 uv__stream_osx_interrupt_select(stream);
1193 }
1194 }
1195 return;
1196 } else if (nread == 0) {
1197 uv__stream_eof(stream, &buf);
1198 return;
1199 } else {
1200 /* Successful read */
1201 ssize_t buflen = buf.len;
1202
1203 if (is_ipc) {
1204 err = uv__stream_recv_cmsg(stream, &msg);
1205 if (err != 0) {
1206 stream->read_cb(stream, err, &buf);
1207 return;
1208 }
1209 }
1210
1211#if defined(__MVS__)
1212 if (is_ipc && msg.msg_controllen > 0) {
1213 uv_buf_t blankbuf;
1214 int nread;
1215 struct iovec *old;
1216
1217 blankbuf.base = 0;
1218 blankbuf.len = 0;
1219 old = msg.msg_iov;
1220 msg.msg_iov = (struct iovec*) &blankbuf;
1221 nread = 0;
1222 do {
1223 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1224 err = uv__stream_recv_cmsg(stream, &msg);
1225 if (err != 0) {
1226 stream->read_cb(stream, err, &buf);
1227 msg.msg_iov = old;
1228 return;
1229 }
1230 } while (nread == 0 && msg.msg_controllen > 0);
1231 msg.msg_iov = old;
1232 }
1233#endif
1234 stream->read_cb(stream, nread, &buf);
1235
1236 /* Return if we didn't fill the buffer, there is no more data to read. */
1237 if (nread < buflen) {
1238 stream->flags |= UV_HANDLE_READ_PARTIAL;
1239 return;
1240 }
1241 }
1242 }
1243}
1244
1245
1246#ifdef __clang__
1247# pragma clang diagnostic pop
1248#endif
1249
1250#undef UV__CMSG_FD_COUNT
1251#undef UV__CMSG_FD_SIZE
1252
1253
1254int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1255 assert(stream->type == UV_TCP ||
1256 stream->type == UV_TTY ||
1257 stream->type == UV_NAMED_PIPE);
1258
1259 if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1260 stream->flags & UV_HANDLE_SHUT ||
1261 stream->flags & UV_HANDLE_SHUTTING ||
1262 uv__is_closing(stream)) {
1263 return UV_ENOTCONN;
1264 }
1265
1266 assert(uv__stream_fd(stream) >= 0);
1267
1268 /* Initialize request */
1269 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1270 req->handle = stream;
1271 req->cb = cb;
1272 stream->shutdown_req = req;
1273 stream->flags |= UV_HANDLE_SHUTTING;
1274
1275 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1276 uv__stream_osx_interrupt_select(stream);
1277
1278 return 0;
1279}
1280
1281
1282static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1283 uv_stream_t* stream;
1284
1285 stream = container_of(w, uv_stream_t, io_watcher);
1286
1287 assert(stream->type == UV_TCP ||
1288 stream->type == UV_NAMED_PIPE ||
1289 stream->type == UV_TTY);
1290 assert(!(stream->flags & UV_HANDLE_CLOSING));
1291
1292 if (stream->connect_req) {
1293 uv__stream_connect(stream);
1294 return;
1295 }
1296
1297 assert(uv__stream_fd(stream) >= 0);
1298
1299 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1300 if (events & (POLLIN | POLLERR | POLLHUP))
1301 uv__read(stream);
1302
1303 if (uv__stream_fd(stream) == -1)
1304 return; /* read_cb closed stream. */
1305
1306 /* Short-circuit iff POLLHUP is set, the user is still interested in read
1307 * events and uv__read() reported a partial read but not EOF. If the EOF
1308 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1309 * have to do anything. If the partial read flag is not set, we can't
1310 * report the EOF yet because there is still data to read.
1311 */
1312 if ((events & POLLHUP) &&
1313 (stream->flags & UV_HANDLE_READING) &&
1314 (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1315 !(stream->flags & UV_HANDLE_READ_EOF)) {
1316 uv_buf_t buf = { NULL, 0 };
1317 uv__stream_eof(stream, &buf);
1318 }
1319
1320 if (uv__stream_fd(stream) == -1)
1321 return; /* read_cb closed stream. */
1322
1323 if (events & (POLLOUT | POLLERR | POLLHUP)) {
1324 uv__write(stream);
1325 uv__write_callbacks(stream);
1326
1327 /* Write queue drained. */
1328 if (QUEUE_EMPTY(&stream->write_queue))
1329 uv__drain(stream);
1330 }
1331}
1332
1333
1334/**
1335 * We get called here from directly following a call to connect(2).
1336 * In order to determine if we've errored out or succeeded must call
1337 * getsockopt.
1338 */
1339static void uv__stream_connect(uv_stream_t* stream) {
1340 int error;
1341 uv_connect_t* req = stream->connect_req;
1342 socklen_t errorsize = sizeof(int);
1343
1344 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1345 assert(req);
1346
1347 if (stream->delayed_error) {
1348 /* To smooth over the differences between unixes errors that
1349 * were reported synchronously on the first connect can be delayed
1350 * until the next tick--which is now.
1351 */
1352 error = stream->delayed_error;
1353 stream->delayed_error = 0;
1354 } else {
1355 /* Normal situation: we need to get the socket error from the kernel. */
1356 assert(uv__stream_fd(stream) >= 0);
1357 getsockopt(uv__stream_fd(stream),
1358 SOL_SOCKET,
1359 SO_ERROR,
1360 &error,
1361 &errorsize);
1362 error = UV__ERR(error);
1363 }
1364
1365 if (error == UV__ERR(EINPROGRESS))
1366 return;
1367
1368 stream->connect_req = NULL;
1369 uv__req_unregister(stream->loop, req);
1370
1371 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1372 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1373 }
1374
1375 if (req->cb)
1376 req->cb(req, error);
1377
1378 if (uv__stream_fd(stream) == -1)
1379 return;
1380
1381 if (error < 0) {
1382 uv__stream_flush_write_queue(stream, UV_ECANCELED);
1383 uv__write_callbacks(stream);
1384 }
1385}
1386
1387
1388int uv_write2(uv_write_t* req,
1389 uv_stream_t* stream,
1390 const uv_buf_t bufs[],
1391 unsigned int nbufs,
1392 uv_stream_t* send_handle,
1393 uv_write_cb cb) {
1394 int empty_queue;
1395
1396 assert(nbufs > 0);
1397 assert((stream->type == UV_TCP ||
1398 stream->type == UV_NAMED_PIPE ||
1399 stream->type == UV_TTY) &&
1400 "uv_write (unix) does not yet support other types of streams");
1401
1402 if (uv__stream_fd(stream) < 0)
1403 return UV_EBADF;
1404
1405 if (!(stream->flags & UV_HANDLE_WRITABLE))
1406 return -EPIPE;
1407
1408 if (send_handle) {
1409 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1410 return UV_EINVAL;
1411
1412 /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1413 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1414 * evaluates to a function that operates on a uv_stream_t with a couple of
1415 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1416 * which works but only by accident.
1417 */
1418 if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1419 return UV_EBADF;
1420
1421#if defined(__CYGWIN__) || defined(__MSYS__)
1422 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1423 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1424 return UV_ENOSYS;
1425#endif
1426 }
1427
1428 /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1429 * it means there are error-state requests in the write_completed_queue that
1430 * will touch up write_queue_size later, see also uv__write_req_finish().
1431 * We could check that write_queue is empty instead but that implies making
1432 * a write() syscall when we know that the handle is in error mode.
1433 */
1434 empty_queue = (stream->write_queue_size == 0);
1435
1436 /* Initialize the req */
1437 uv__req_init(stream->loop, req, UV_WRITE);
1438 req->cb = cb;
1439 req->handle = stream;
1440 req->error = 0;
1441 req->send_handle = send_handle;
1442 QUEUE_INIT(&req->queue);
1443
1444 req->bufs = req->bufsml;
1445 if (nbufs > ARRAY_SIZE(req->bufsml))
1446 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1447
1448 if (req->bufs == NULL)
1449 return UV_ENOMEM;
1450
1451 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1452 req->nbufs = nbufs;
1453 req->write_index = 0;
1454 stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1455
1456 /* Append the request to write_queue. */
1457 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1458
1459 /* If the queue was empty when this function began, we should attempt to
1460 * do the write immediately. Otherwise start the write_watcher and wait
1461 * for the fd to become writable.
1462 */
1463 if (stream->connect_req) {
1464 /* Still connecting, do nothing. */
1465 }
1466 else if (empty_queue) {
1467 uv__write(stream);
1468 }
1469 else {
1470 /*
1471 * blocking streams should never have anything in the queue.
1472 * if this assert fires then somehow the blocking stream isn't being
1473 * sufficiently flushed in uv__write.
1474 */
1475 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1476 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1477 uv__stream_osx_interrupt_select(stream);
1478 }
1479
1480 return 0;
1481}
1482
1483
1484/* The buffers to be written must remain valid until the callback is called.
1485 * This is not required for the uv_buf_t array.
1486 */
1487int uv_write(uv_write_t* req,
1488 uv_stream_t* handle,
1489 const uv_buf_t bufs[],
1490 unsigned int nbufs,
1491 uv_write_cb cb) {
1492 return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1493}
1494
1495
1496void uv_try_write_cb(uv_write_t* req, int status) {
1497 /* Should not be called */
1498 abort();
1499}
1500
1501
1502int uv_try_write(uv_stream_t* stream,
1503 const uv_buf_t bufs[],
1504 unsigned int nbufs) {
1505 int r;
1506 int has_pollout;
1507 size_t written;
1508 size_t req_size;
1509 uv_write_t req;
1510
1511 /* Connecting or already writing some data */
1512 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1513 return UV_EAGAIN;
1514
1515 has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
1516
1517 r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
1518 if (r != 0)
1519 return r;
1520
1521 /* Remove not written bytes from write queue size */
1522 written = uv__count_bufs(bufs, nbufs);
1523 if (req.bufs != NULL)
1524 req_size = uv__write_req_size(&req);
1525 else
1526 req_size = 0;
1527 written -= req_size;
1528 stream->write_queue_size -= req_size;
1529
1530 /* Unqueue request, regardless of immediateness */
1531 QUEUE_REMOVE(&req.queue);
1532 uv__req_unregister(stream->loop, &req);
1533 if (req.bufs != req.bufsml)
1534 uv__free(req.bufs);
1535 req.bufs = NULL;
1536
1537 /* Do not poll for writable, if we wasn't before calling this */
1538 if (!has_pollout) {
1539 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1540 uv__stream_osx_interrupt_select(stream);
1541 }
1542
1543 if (written == 0 && req_size != 0)
1544 return req.error < 0 ? req.error : UV_EAGAIN;
1545 else
1546 return written;
1547}
1548
1549
1550int uv_read_start(uv_stream_t* stream,
1551 uv_alloc_cb alloc_cb,
1552 uv_read_cb read_cb) {
1553 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1554 stream->type == UV_TTY);
1555
1556 if (stream->flags & UV_HANDLE_CLOSING)
1557 return UV_EINVAL;
1558
1559 if (!(stream->flags & UV_HANDLE_READABLE))
1560 return -ENOTCONN;
1561
1562 /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
1563 * expresses the desired state of the user.
1564 */
1565 stream->flags |= UV_HANDLE_READING;
1566
1567 /* TODO: try to do the read inline? */
1568 /* TODO: keep track of tcp state. If we've gotten a EOF then we should
1569 * not start the IO watcher.
1570 */
1571 assert(uv__stream_fd(stream) >= 0);
1572 assert(alloc_cb);
1573
1574 stream->read_cb = read_cb;
1575 stream->alloc_cb = alloc_cb;
1576
1577 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1578 uv__handle_start(stream);
1579 uv__stream_osx_interrupt_select(stream);
1580
1581 return 0;
1582}
1583
1584
1585int uv_read_stop(uv_stream_t* stream) {
1586 if (!(stream->flags & UV_HANDLE_READING))
1587 return 0;
1588
1589 stream->flags &= ~UV_HANDLE_READING;
1590 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1591 if (!uv__io_active(&stream->io_watcher, POLLOUT))
1592 uv__handle_stop(stream);
1593 uv__stream_osx_interrupt_select(stream);
1594
1595 stream->read_cb = NULL;
1596 stream->alloc_cb = NULL;
1597 return 0;
1598}
1599
1600
1601int uv_is_readable(const uv_stream_t* stream) {
1602 return !!(stream->flags & UV_HANDLE_READABLE);
1603}
1604
1605
1606int uv_is_writable(const uv_stream_t* stream) {
1607 return !!(stream->flags & UV_HANDLE_WRITABLE);
1608}
1609
1610
1611#if defined(__APPLE__)
1612int uv___stream_fd(const uv_stream_t* handle) {
1613 const uv__stream_select_t* s;
1614
1615 assert(handle->type == UV_TCP ||
1616 handle->type == UV_TTY ||
1617 handle->type == UV_NAMED_PIPE);
1618
1619 s = handle->select;
1620 if (s != NULL)
1621 return s->fd;
1622
1623 return handle->io_watcher.fd;
1624}
1625#endif /* defined(__APPLE__) */
1626
1627
1628void uv__stream_close(uv_stream_t* handle) {
1629 unsigned int i;
1630 uv__stream_queued_fds_t* queued_fds;
1631
1632#if defined(__APPLE__)
1633 /* Terminate select loop first */
1634 if (handle->select != NULL) {
1635 uv__stream_select_t* s;
1636
1637 s = handle->select;
1638
1639 uv_sem_post(&s->close_sem);
1640 uv_sem_post(&s->async_sem);
1641 uv__stream_osx_interrupt_select(handle);
1642 uv_thread_join(&s->thread);
1643 uv_sem_destroy(&s->close_sem);
1644 uv_sem_destroy(&s->async_sem);
1645 uv__close(s->fake_fd);
1646 uv__close(s->int_fd);
1647 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1648
1649 handle->select = NULL;
1650 }
1651#endif /* defined(__APPLE__) */
1652
1653 uv__io_close(handle->loop, &handle->io_watcher);
1654 uv_read_stop(handle);
1655 uv__handle_stop(handle);
1656 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1657
1658 if (handle->io_watcher.fd != -1) {
1659 /* Don't close stdio file descriptors. Nothing good comes from it. */
1660 if (handle->io_watcher.fd > STDERR_FILENO)
1661 uv__close(handle->io_watcher.fd);
1662 handle->io_watcher.fd = -1;
1663 }
1664
1665 if (handle->accepted_fd != -1) {
1666 uv__close(handle->accepted_fd);
1667 handle->accepted_fd = -1;
1668 }
1669
1670 /* Close all queued fds */
1671 if (handle->queued_fds != NULL) {
1672 queued_fds = handle->queued_fds;
1673 for (i = 0; i < queued_fds->offset; i++)
1674 uv__close(queued_fds->fds[i]);
1675 uv__free(handle->queued_fds);
1676 handle->queued_fds = NULL;
1677 }
1678
1679 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1680}
1681
1682
1683int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1684 /* Don't need to check the file descriptor, uv__nonblock()
1685 * will fail with EBADF if it's not valid.
1686 */
1687 return uv__nonblock(uv__stream_fd(handle), !blocking);
1688}
1689