1 | /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. |
2 | * |
3 | * Permission is hereby granted, free of charge, to any person obtaining a copy |
4 | * of this software and associated documentation files (the "Software"), to |
5 | * deal in the Software without restriction, including without limitation the |
6 | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
7 | * sell copies of the Software, and to permit persons to whom the Software is |
8 | * furnished to do so, subject to the following conditions: |
9 | * |
10 | * The above copyright notice and this permission notice shall be included in |
11 | * all copies or substantial portions of the Software. |
12 | * |
13 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
14 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
15 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
16 | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
17 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
18 | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
19 | * IN THE SOFTWARE. |
20 | */ |
21 | |
22 | #include "uv.h" |
23 | #include "task.h" |
24 | |
25 | #include <stdio.h> |
26 | #include <string.h> |
27 | |
28 | static uv_pipe_t channel; |
29 | static uv_tcp_t tcp_server; |
30 | static uv_tcp_t tcp_server2; |
31 | static uv_tcp_t tcp_connection; |
32 | |
33 | static int exit_cb_called; |
34 | static int read_cb_called; |
35 | static int tcp_write_cb_called; |
36 | static int tcp_read_cb_called; |
37 | static int on_pipe_read_called; |
38 | static int local_conn_accepted; |
39 | static int remote_conn_accepted; |
40 | static int tcp_server_listening; |
41 | static uv_write_t write_req; |
42 | static uv_write_t write_req2; |
43 | static uv_write_t conn_notify_req; |
44 | static int close_cb_called; |
45 | static int connection_accepted; |
46 | static int tcp_conn_read_cb_called; |
47 | static int tcp_conn_write_cb_called; |
48 | static int closed_handle_data_read; |
49 | static int closed_handle_write; |
50 | static int send_zero_write; |
51 | |
52 | typedef struct { |
53 | uv_connect_t conn_req; |
54 | uv_write_t tcp_write_req; |
55 | uv_tcp_t conn; |
56 | } tcp_conn; |
57 | |
58 | #define CONN_COUNT 100 |
59 | #define BACKLOG 128 |
60 | #define LARGE_SIZE 100000 |
61 | |
62 | static uv_buf_t large_buf; |
63 | static char buffer[LARGE_SIZE]; |
64 | static uv_write_t write_reqs[300]; |
65 | static int write_reqs_completed; |
66 | |
67 | static unsigned int write_until_data_queued(void); |
68 | static void send_handle_and_close(void); |
69 | |
70 | |
71 | static void close_server_conn_cb(uv_handle_t* handle) { |
72 | free(handle); |
73 | } |
74 | |
75 | |
76 | static void on_connection(uv_stream_t* server, int status) { |
77 | uv_tcp_t* conn; |
78 | int r; |
79 | |
80 | if (!local_conn_accepted) { |
81 | /* Accept the connection and close it. Also and close the server. */ |
82 | ASSERT(status == 0); |
83 | ASSERT((uv_stream_t*)&tcp_server == server); |
84 | |
85 | conn = malloc(sizeof(*conn)); |
86 | ASSERT(conn); |
87 | r = uv_tcp_init(server->loop, conn); |
88 | ASSERT(r == 0); |
89 | |
90 | r = uv_accept(server, (uv_stream_t*)conn); |
91 | ASSERT(r == 0); |
92 | |
93 | uv_close((uv_handle_t*)conn, close_server_conn_cb); |
94 | uv_close((uv_handle_t*)server, NULL); |
95 | local_conn_accepted = 1; |
96 | } |
97 | } |
98 | |
99 | |
100 | static void exit_cb(uv_process_t* process, |
101 | int64_t exit_status, |
102 | int term_signal) { |
103 | printf("exit_cb\n" ); |
104 | exit_cb_called++; |
105 | ASSERT(exit_status == 0); |
106 | ASSERT(term_signal == 0); |
107 | uv_close((uv_handle_t*)process, NULL); |
108 | } |
109 | |
110 | |
111 | static void on_alloc(uv_handle_t* handle, |
112 | size_t suggested_size, |
113 | uv_buf_t* buf) { |
114 | buf->base = malloc(suggested_size); |
115 | buf->len = suggested_size; |
116 | } |
117 | |
118 | |
119 | static void close_client_conn_cb(uv_handle_t* handle) { |
120 | tcp_conn* p = (tcp_conn*)handle->data; |
121 | free(p); |
122 | } |
123 | |
124 | |
125 | static void connect_cb(uv_connect_t* req, int status) { |
126 | uv_close((uv_handle_t*)req->handle, close_client_conn_cb); |
127 | } |
128 | |
129 | |
130 | static void make_many_connections(void) { |
131 | tcp_conn* conn; |
132 | struct sockaddr_in addr; |
133 | int r, i; |
134 | |
135 | for (i = 0; i < CONN_COUNT; i++) { |
136 | conn = malloc(sizeof(*conn)); |
137 | ASSERT(conn); |
138 | |
139 | r = uv_tcp_init(uv_default_loop(), &conn->conn); |
140 | ASSERT(r == 0); |
141 | |
142 | ASSERT(0 == uv_ip4_addr("127.0.0.1" , TEST_PORT, &addr)); |
143 | |
144 | r = uv_tcp_connect(&conn->conn_req, |
145 | (uv_tcp_t*) &conn->conn, |
146 | (const struct sockaddr*) &addr, |
147 | connect_cb); |
148 | ASSERT(r == 0); |
149 | |
150 | conn->conn.data = conn; |
151 | } |
152 | } |
153 | |
154 | |
155 | static void on_read(uv_stream_t* handle, |
156 | ssize_t nread, |
157 | const uv_buf_t* buf) { |
158 | int r; |
159 | uv_pipe_t* pipe; |
160 | uv_handle_type pending; |
161 | uv_buf_t outbuf; |
162 | |
163 | pipe = (uv_pipe_t*) handle; |
164 | |
165 | if (nread == 0) { |
166 | /* Everything OK, but nothing read. */ |
167 | free(buf->base); |
168 | return; |
169 | } |
170 | |
171 | if (nread < 0) { |
172 | if (nread == UV_EOF) { |
173 | free(buf->base); |
174 | return; |
175 | } |
176 | |
177 | printf("error recving on channel: %s\n" , uv_strerror(nread)); |
178 | abort(); |
179 | } |
180 | |
181 | fprintf(stderr, "got %d bytes\n" , (int)nread); |
182 | |
183 | pending = uv_pipe_pending_type(pipe); |
184 | if (!tcp_server_listening) { |
185 | ASSERT(1 == uv_pipe_pending_count(pipe)); |
186 | ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE); |
187 | read_cb_called++; |
188 | |
189 | /* Accept the pending TCP server, and start listening on it. */ |
190 | ASSERT(pending == UV_TCP); |
191 | r = uv_tcp_init(uv_default_loop(), &tcp_server); |
192 | ASSERT(r == 0); |
193 | |
194 | r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); |
195 | ASSERT(r == 0); |
196 | |
197 | r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection); |
198 | ASSERT(r == 0); |
199 | |
200 | tcp_server_listening = 1; |
201 | |
202 | /* Make sure that the expected data is correctly multiplexed. */ |
203 | ASSERT(memcmp("hello\n" , buf->base, nread) == 0); |
204 | |
205 | outbuf = uv_buf_init("world\n" , 6); |
206 | r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); |
207 | ASSERT(r == 0); |
208 | |
209 | /* Create a bunch of connections to get both servers to accept. */ |
210 | make_many_connections(); |
211 | } else if (memcmp("accepted_connection\n" , buf->base, nread) == 0) { |
212 | /* Remote server has accepted a connection. Close the channel. */ |
213 | ASSERT(0 == uv_pipe_pending_count(pipe)); |
214 | ASSERT(pending == UV_UNKNOWN_HANDLE); |
215 | remote_conn_accepted = 1; |
216 | uv_close((uv_handle_t*)&channel, NULL); |
217 | } |
218 | |
219 | free(buf->base); |
220 | } |
221 | |
222 | #ifdef _WIN32 |
223 | static void on_read_listen_after_bound_twice(uv_stream_t* handle, |
224 | ssize_t nread, |
225 | const uv_buf_t* buf) { |
226 | int r; |
227 | uv_pipe_t* pipe; |
228 | uv_handle_type pending; |
229 | |
230 | pipe = (uv_pipe_t*) handle; |
231 | |
232 | if (nread == 0) { |
233 | /* Everything OK, but nothing read. */ |
234 | free(buf->base); |
235 | return; |
236 | } |
237 | |
238 | if (nread < 0) { |
239 | if (nread == UV_EOF) { |
240 | free(buf->base); |
241 | return; |
242 | } |
243 | |
244 | printf("error recving on channel: %s\n" , uv_strerror(nread)); |
245 | abort(); |
246 | } |
247 | |
248 | fprintf(stderr, "got %d bytes\n" , (int)nread); |
249 | |
250 | ASSERT(uv_pipe_pending_count(pipe) > 0); |
251 | pending = uv_pipe_pending_type(pipe); |
252 | ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE); |
253 | read_cb_called++; |
254 | |
255 | if (read_cb_called == 1) { |
256 | /* Accept the first TCP server, and start listening on it. */ |
257 | ASSERT(pending == UV_TCP); |
258 | r = uv_tcp_init(uv_default_loop(), &tcp_server); |
259 | ASSERT(r == 0); |
260 | |
261 | r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); |
262 | ASSERT(r == 0); |
263 | |
264 | r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection); |
265 | ASSERT(r == 0); |
266 | } else if (read_cb_called == 2) { |
267 | /* Accept the second TCP server, and start listening on it. */ |
268 | ASSERT(pending == UV_TCP); |
269 | r = uv_tcp_init(uv_default_loop(), &tcp_server2); |
270 | ASSERT(r == 0); |
271 | |
272 | r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2); |
273 | ASSERT(r == 0); |
274 | |
275 | r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection); |
276 | ASSERT(r == UV_EADDRINUSE); |
277 | |
278 | uv_close((uv_handle_t*)&tcp_server, NULL); |
279 | uv_close((uv_handle_t*)&tcp_server2, NULL); |
280 | ASSERT(0 == uv_pipe_pending_count(pipe)); |
281 | uv_close((uv_handle_t*)&channel, NULL); |
282 | } |
283 | |
284 | free(buf->base); |
285 | } |
286 | #endif |
287 | |
288 | void spawn_helper(uv_pipe_t* channel, |
289 | uv_process_t* process, |
290 | const char* helper) { |
291 | uv_process_options_t options; |
292 | size_t exepath_size; |
293 | char exepath[1024]; |
294 | char* args[3]; |
295 | int r; |
296 | uv_stdio_container_t stdio[3]; |
297 | |
298 | r = uv_pipe_init(uv_default_loop(), channel, 1); |
299 | ASSERT(r == 0); |
300 | ASSERT(channel->ipc); |
301 | |
302 | exepath_size = sizeof(exepath); |
303 | r = uv_exepath(exepath, &exepath_size); |
304 | ASSERT(r == 0); |
305 | |
306 | exepath[exepath_size] = '\0'; |
307 | args[0] = exepath; |
308 | args[1] = (char*)helper; |
309 | args[2] = NULL; |
310 | |
311 | memset(&options, 0, sizeof(options)); |
312 | options.file = exepath; |
313 | options.args = args; |
314 | options.exit_cb = exit_cb; |
315 | options.stdio = stdio; |
316 | options.stdio_count = ARRAY_SIZE(stdio); |
317 | |
318 | stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE; |
319 | stdio[0].data.stream = (uv_stream_t*) channel; |
320 | stdio[1].flags = UV_INHERIT_FD; |
321 | stdio[1].data.fd = 1; |
322 | stdio[2].flags = UV_INHERIT_FD; |
323 | stdio[2].data.fd = 2; |
324 | |
325 | r = uv_spawn(uv_default_loop(), process, &options); |
326 | ASSERT(r == 0); |
327 | } |
328 | |
329 | |
330 | static void on_tcp_write(uv_write_t* req, int status) { |
331 | ASSERT(status == 0); |
332 | ASSERT(req->handle == (uv_stream_t*)&tcp_connection); |
333 | tcp_write_cb_called++; |
334 | } |
335 | |
336 | |
337 | static void on_read_alloc(uv_handle_t* handle, |
338 | size_t suggested_size, |
339 | uv_buf_t* buf) { |
340 | buf->base = malloc(suggested_size); |
341 | buf->len = suggested_size; |
342 | } |
343 | |
344 | |
345 | static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) { |
346 | ASSERT(nread > 0); |
347 | ASSERT(memcmp("hello again\n" , buf->base, nread) == 0); |
348 | ASSERT(tcp == (uv_stream_t*)&tcp_connection); |
349 | free(buf->base); |
350 | |
351 | tcp_read_cb_called++; |
352 | |
353 | uv_close((uv_handle_t*)tcp, NULL); |
354 | uv_close((uv_handle_t*)&channel, NULL); |
355 | } |
356 | |
357 | |
358 | static void on_read_connection(uv_stream_t* handle, |
359 | ssize_t nread, |
360 | const uv_buf_t* buf) { |
361 | int r; |
362 | uv_buf_t outbuf; |
363 | uv_pipe_t* pipe; |
364 | uv_handle_type pending; |
365 | |
366 | pipe = (uv_pipe_t*) handle; |
367 | if (nread == 0) { |
368 | /* Everything OK, but nothing read. */ |
369 | free(buf->base); |
370 | return; |
371 | } |
372 | |
373 | if (nread < 0) { |
374 | if (nread == UV_EOF) { |
375 | free(buf->base); |
376 | return; |
377 | } |
378 | |
379 | printf("error recving on channel: %s\n" , uv_strerror(nread)); |
380 | abort(); |
381 | } |
382 | |
383 | fprintf(stderr, "got %d bytes\n" , (int)nread); |
384 | |
385 | ASSERT(1 == uv_pipe_pending_count(pipe)); |
386 | pending = uv_pipe_pending_type(pipe); |
387 | |
388 | ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE); |
389 | read_cb_called++; |
390 | |
391 | /* Accept the pending TCP connection */ |
392 | ASSERT(pending == UV_TCP); |
393 | r = uv_tcp_init(uv_default_loop(), &tcp_connection); |
394 | ASSERT(r == 0); |
395 | |
396 | r = uv_accept(handle, (uv_stream_t*)&tcp_connection); |
397 | ASSERT(r == 0); |
398 | |
399 | /* Make sure that the expected data is correctly multiplexed. */ |
400 | ASSERT(memcmp("hello\n" , buf->base, nread) == 0); |
401 | |
402 | /* Write/read to/from the connection */ |
403 | outbuf = uv_buf_init("world\n" , 6); |
404 | r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1, |
405 | on_tcp_write); |
406 | ASSERT(r == 0); |
407 | |
408 | r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read); |
409 | ASSERT(r == 0); |
410 | |
411 | free(buf->base); |
412 | } |
413 | |
414 | |
415 | #ifndef _WIN32 |
416 | static void on_read_closed_handle(uv_stream_t* handle, |
417 | ssize_t nread, |
418 | const uv_buf_t* buf) { |
419 | if (nread == 0 || nread == UV_EOF) { |
420 | free(buf->base); |
421 | return; |
422 | } |
423 | |
424 | if (nread < 0) { |
425 | printf("error recving on channel: %s\n" , uv_strerror(nread)); |
426 | abort(); |
427 | } |
428 | |
429 | closed_handle_data_read += nread; |
430 | free(buf->base); |
431 | } |
432 | #endif |
433 | |
434 | |
435 | static void on_read_send_zero(uv_stream_t* handle, |
436 | ssize_t nread, |
437 | const uv_buf_t* buf) { |
438 | ASSERT(nread == 0 || nread == UV_EOF); |
439 | free(buf->base); |
440 | } |
441 | |
442 | |
443 | static int run_ipc_test(const char* helper, uv_read_cb read_cb) { |
444 | uv_process_t process; |
445 | int r; |
446 | |
447 | spawn_helper(&channel, &process, helper); |
448 | uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb); |
449 | |
450 | r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); |
451 | ASSERT(r == 0); |
452 | |
453 | MAKE_VALGRIND_HAPPY(); |
454 | return 0; |
455 | } |
456 | |
457 | |
458 | TEST_IMPL(ipc_listen_before_write) { |
459 | #if defined(NO_SEND_HANDLE_ON_PIPE) |
460 | RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); |
461 | #endif |
462 | int r = run_ipc_test("ipc_helper_listen_before_write" , on_read); |
463 | ASSERT(local_conn_accepted == 1); |
464 | ASSERT(remote_conn_accepted == 1); |
465 | ASSERT(read_cb_called == 1); |
466 | ASSERT(exit_cb_called == 1); |
467 | return r; |
468 | } |
469 | |
470 | |
471 | TEST_IMPL(ipc_listen_after_write) { |
472 | #if defined(NO_SEND_HANDLE_ON_PIPE) |
473 | RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); |
474 | #endif |
475 | int r = run_ipc_test("ipc_helper_listen_after_write" , on_read); |
476 | ASSERT(local_conn_accepted == 1); |
477 | ASSERT(remote_conn_accepted == 1); |
478 | ASSERT(read_cb_called == 1); |
479 | ASSERT(exit_cb_called == 1); |
480 | return r; |
481 | } |
482 | |
483 | |
484 | TEST_IMPL(ipc_tcp_connection) { |
485 | #if defined(NO_SEND_HANDLE_ON_PIPE) |
486 | RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); |
487 | #endif |
488 | int r = run_ipc_test("ipc_helper_tcp_connection" , on_read_connection); |
489 | ASSERT(read_cb_called == 1); |
490 | ASSERT(tcp_write_cb_called == 1); |
491 | ASSERT(tcp_read_cb_called == 1); |
492 | ASSERT(exit_cb_called == 1); |
493 | return r; |
494 | } |
495 | |
496 | #ifndef _WIN32 |
497 | TEST_IMPL(ipc_closed_handle) { |
498 | int r; |
499 | r = run_ipc_test("ipc_helper_closed_handle" , on_read_closed_handle); |
500 | ASSERT(r == 0); |
501 | return 0; |
502 | } |
503 | #endif |
504 | |
505 | |
506 | #ifdef _WIN32 |
507 | TEST_IMPL(listen_with_simultaneous_accepts) { |
508 | uv_tcp_t server; |
509 | int r; |
510 | struct sockaddr_in addr; |
511 | |
512 | ASSERT(0 == uv_ip4_addr("0.0.0.0" , TEST_PORT, &addr)); |
513 | |
514 | r = uv_tcp_init(uv_default_loop(), &server); |
515 | ASSERT(r == 0); |
516 | |
517 | r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0); |
518 | ASSERT(r == 0); |
519 | |
520 | r = uv_tcp_simultaneous_accepts(&server, 1); |
521 | ASSERT(r == 0); |
522 | |
523 | r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); |
524 | ASSERT(r == 0); |
525 | ASSERT(server.reqs_pending == 32); |
526 | |
527 | MAKE_VALGRIND_HAPPY(); |
528 | return 0; |
529 | } |
530 | |
531 | |
532 | TEST_IMPL(listen_no_simultaneous_accepts) { |
533 | uv_tcp_t server; |
534 | int r; |
535 | struct sockaddr_in addr; |
536 | |
537 | ASSERT(0 == uv_ip4_addr("0.0.0.0" , TEST_PORT, &addr)); |
538 | |
539 | r = uv_tcp_init(uv_default_loop(), &server); |
540 | ASSERT(r == 0); |
541 | |
542 | r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0); |
543 | ASSERT(r == 0); |
544 | |
545 | r = uv_tcp_simultaneous_accepts(&server, 0); |
546 | ASSERT(r == 0); |
547 | |
548 | r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); |
549 | ASSERT(r == 0); |
550 | ASSERT(server.reqs_pending == 1); |
551 | |
552 | MAKE_VALGRIND_HAPPY(); |
553 | return 0; |
554 | } |
555 | |
556 | TEST_IMPL(ipc_listen_after_bind_twice) { |
557 | #if defined(NO_SEND_HANDLE_ON_PIPE) |
558 | RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); |
559 | #endif |
560 | int r = run_ipc_test("ipc_helper_bind_twice" , on_read_listen_after_bound_twice); |
561 | ASSERT(read_cb_called == 2); |
562 | ASSERT(exit_cb_called == 1); |
563 | return r; |
564 | } |
565 | #endif |
566 | |
567 | TEST_IMPL(ipc_send_zero) { |
568 | int r; |
569 | r = run_ipc_test("ipc_helper_send_zero" , on_read_send_zero); |
570 | ASSERT(r == 0); |
571 | return 0; |
572 | } |
573 | |
574 | |
575 | /* Everything here runs in a child process. */ |
576 | |
577 | static tcp_conn conn; |
578 | |
579 | |
580 | static void close_cb(uv_handle_t* handle) { |
581 | close_cb_called++; |
582 | } |
583 | |
584 | |
585 | static void conn_notify_write_cb(uv_write_t* req, int status) { |
586 | uv_close((uv_handle_t*)&tcp_server, close_cb); |
587 | uv_close((uv_handle_t*)&channel, close_cb); |
588 | } |
589 | |
590 | |
591 | static void tcp_connection_write_cb(uv_write_t* req, int status) { |
592 | ASSERT((uv_handle_t*)&conn.conn == (uv_handle_t*)req->handle); |
593 | uv_close((uv_handle_t*)req->handle, close_cb); |
594 | uv_close((uv_handle_t*)&channel, close_cb); |
595 | uv_close((uv_handle_t*)&tcp_server, close_cb); |
596 | tcp_conn_write_cb_called++; |
597 | } |
598 | |
599 | |
600 | static void closed_handle_large_write_cb(uv_write_t* req, int status) { |
601 | ASSERT(status == 0); |
602 | ASSERT(closed_handle_data_read = LARGE_SIZE); |
603 | if (++write_reqs_completed == ARRAY_SIZE(write_reqs)) { |
604 | write_reqs_completed = 0; |
605 | if (write_until_data_queued() > 0) |
606 | send_handle_and_close(); |
607 | } |
608 | } |
609 | |
610 | |
611 | static void closed_handle_write_cb(uv_write_t* req, int status) { |
612 | ASSERT(status == UV_EBADF); |
613 | closed_handle_write = 1; |
614 | } |
615 | |
616 | |
617 | static void send_zero_write_cb(uv_write_t* req, int status) { |
618 | ASSERT(status == 0); |
619 | send_zero_write++; |
620 | } |
621 | |
622 | static void on_tcp_child_process_read(uv_stream_t* tcp, |
623 | ssize_t nread, |
624 | const uv_buf_t* buf) { |
625 | uv_buf_t outbuf; |
626 | int r; |
627 | |
628 | if (nread < 0) { |
629 | if (nread == UV_EOF) { |
630 | free(buf->base); |
631 | return; |
632 | } |
633 | |
634 | printf("error recving on tcp connection: %s\n" , uv_strerror(nread)); |
635 | abort(); |
636 | } |
637 | |
638 | ASSERT(nread > 0); |
639 | ASSERT(memcmp("world\n" , buf->base, nread) == 0); |
640 | on_pipe_read_called++; |
641 | free(buf->base); |
642 | |
643 | /* Write to the socket */ |
644 | outbuf = uv_buf_init("hello again\n" , 12); |
645 | r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb); |
646 | ASSERT(r == 0); |
647 | |
648 | tcp_conn_read_cb_called++; |
649 | } |
650 | |
651 | |
652 | static void connect_child_process_cb(uv_connect_t* req, int status) { |
653 | int r; |
654 | |
655 | ASSERT(status == 0); |
656 | r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read); |
657 | ASSERT(r == 0); |
658 | } |
659 | |
660 | |
661 | static void ipc_on_connection(uv_stream_t* server, int status) { |
662 | int r; |
663 | uv_buf_t buf; |
664 | |
665 | if (!connection_accepted) { |
666 | /* |
667 | * Accept the connection and close it. Also let the other |
668 | * side know. |
669 | */ |
670 | ASSERT(status == 0); |
671 | ASSERT((uv_stream_t*)&tcp_server == server); |
672 | |
673 | r = uv_tcp_init(server->loop, &conn.conn); |
674 | ASSERT(r == 0); |
675 | |
676 | r = uv_accept(server, (uv_stream_t*)&conn.conn); |
677 | ASSERT(r == 0); |
678 | |
679 | uv_close((uv_handle_t*)&conn.conn, close_cb); |
680 | |
681 | buf = uv_buf_init("accepted_connection\n" , 20); |
682 | r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, |
683 | NULL, conn_notify_write_cb); |
684 | ASSERT(r == 0); |
685 | |
686 | connection_accepted = 1; |
687 | } |
688 | } |
689 | |
690 | |
691 | static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) { |
692 | int r; |
693 | uv_buf_t buf; |
694 | uv_tcp_t* conn; |
695 | |
696 | ASSERT(status == 0); |
697 | ASSERT((uv_stream_t*)&tcp_server == server); |
698 | |
699 | conn = malloc(sizeof(*conn)); |
700 | ASSERT(conn); |
701 | |
702 | r = uv_tcp_init(server->loop, conn); |
703 | ASSERT(r == 0); |
704 | |
705 | r = uv_accept(server, (uv_stream_t*)conn); |
706 | ASSERT(r == 0); |
707 | |
708 | /* Send the accepted connection to the other process */ |
709 | buf = uv_buf_init("hello\n" , 6); |
710 | r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, |
711 | (uv_stream_t*)conn, NULL); |
712 | ASSERT(r == 0); |
713 | |
714 | r = uv_read_start((uv_stream_t*) conn, |
715 | on_read_alloc, |
716 | on_tcp_child_process_read); |
717 | ASSERT(r == 0); |
718 | |
719 | uv_close((uv_handle_t*)conn, close_cb); |
720 | } |
721 | |
722 | |
723 | int ipc_helper(int listen_after_write) { |
724 | /* |
725 | * This is launched from test-ipc.c. stdin is a duplex channel that we |
726 | * over which a handle will be transmitted. |
727 | */ |
728 | struct sockaddr_in addr; |
729 | int r; |
730 | uv_buf_t buf; |
731 | |
732 | ASSERT(0 == uv_ip4_addr("0.0.0.0" , TEST_PORT, &addr)); |
733 | |
734 | r = uv_pipe_init(uv_default_loop(), &channel, 1); |
735 | ASSERT(r == 0); |
736 | |
737 | uv_pipe_open(&channel, 0); |
738 | |
739 | ASSERT(1 == uv_is_readable((uv_stream_t*) &channel)); |
740 | ASSERT(1 == uv_is_writable((uv_stream_t*) &channel)); |
741 | ASSERT(0 == uv_is_closing((uv_handle_t*) &channel)); |
742 | |
743 | r = uv_tcp_init(uv_default_loop(), &tcp_server); |
744 | ASSERT(r == 0); |
745 | |
746 | r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); |
747 | ASSERT(r == 0); |
748 | |
749 | if (!listen_after_write) { |
750 | r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection); |
751 | ASSERT(r == 0); |
752 | } |
753 | |
754 | buf = uv_buf_init("hello\n" , 6); |
755 | r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, |
756 | (uv_stream_t*)&tcp_server, NULL); |
757 | ASSERT(r == 0); |
758 | |
759 | if (listen_after_write) { |
760 | r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection); |
761 | ASSERT(r == 0); |
762 | } |
763 | |
764 | notify_parent_process(); |
765 | r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); |
766 | ASSERT(r == 0); |
767 | |
768 | ASSERT(connection_accepted == 1); |
769 | ASSERT(close_cb_called == 3); |
770 | |
771 | MAKE_VALGRIND_HAPPY(); |
772 | return 0; |
773 | } |
774 | |
775 | |
776 | int ipc_helper_tcp_connection(void) { |
777 | /* |
778 | * This is launched from test-ipc.c. stdin is a duplex channel |
779 | * over which a handle will be transmitted. |
780 | */ |
781 | |
782 | int r; |
783 | struct sockaddr_in addr; |
784 | |
785 | r = uv_pipe_init(uv_default_loop(), &channel, 1); |
786 | ASSERT(r == 0); |
787 | |
788 | uv_pipe_open(&channel, 0); |
789 | |
790 | ASSERT(1 == uv_is_readable((uv_stream_t*) &channel)); |
791 | ASSERT(1 == uv_is_writable((uv_stream_t*) &channel)); |
792 | ASSERT(0 == uv_is_closing((uv_handle_t*) &channel)); |
793 | |
794 | r = uv_tcp_init(uv_default_loop(), &tcp_server); |
795 | ASSERT(r == 0); |
796 | |
797 | ASSERT(0 == uv_ip4_addr("0.0.0.0" , TEST_PORT, &addr)); |
798 | |
799 | r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); |
800 | ASSERT(r == 0); |
801 | |
802 | r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn); |
803 | ASSERT(r == 0); |
804 | |
805 | /* Make a connection to the server */ |
806 | r = uv_tcp_init(uv_default_loop(), &conn.conn); |
807 | ASSERT(r == 0); |
808 | |
809 | ASSERT(0 == uv_ip4_addr("127.0.0.1" , TEST_PORT, &addr)); |
810 | |
811 | r = uv_tcp_connect(&conn.conn_req, |
812 | (uv_tcp_t*) &conn.conn, |
813 | (const struct sockaddr*) &addr, |
814 | connect_child_process_cb); |
815 | ASSERT(r == 0); |
816 | |
817 | r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); |
818 | ASSERT(r == 0); |
819 | |
820 | ASSERT(tcp_conn_read_cb_called == 1); |
821 | ASSERT(tcp_conn_write_cb_called == 1); |
822 | ASSERT(close_cb_called == 4); |
823 | |
824 | MAKE_VALGRIND_HAPPY(); |
825 | return 0; |
826 | } |
827 | |
828 | static unsigned int write_until_data_queued() { |
829 | unsigned int i; |
830 | int r; |
831 | |
832 | i = 0; |
833 | do { |
834 | r = uv_write(&write_reqs[i], |
835 | (uv_stream_t*)&channel, |
836 | &large_buf, |
837 | 1, |
838 | closed_handle_large_write_cb); |
839 | ASSERT(r == 0); |
840 | i++; |
841 | } while (channel.write_queue_size == 0 && |
842 | i < ARRAY_SIZE(write_reqs)); |
843 | |
844 | return channel.write_queue_size; |
845 | } |
846 | |
847 | static void send_handle_and_close() { |
848 | int r; |
849 | struct sockaddr_in addr; |
850 | |
851 | r = uv_tcp_init(uv_default_loop(), &tcp_server); |
852 | ASSERT(r == 0); |
853 | |
854 | ASSERT(0 == uv_ip4_addr("0.0.0.0" , TEST_PORT, &addr)); |
855 | |
856 | r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); |
857 | ASSERT(r == 0); |
858 | |
859 | r = uv_write2(&write_req, |
860 | (uv_stream_t*)&channel, |
861 | &large_buf, |
862 | 1, |
863 | (uv_stream_t*)&tcp_server, |
864 | closed_handle_write_cb); |
865 | ASSERT(r == 0); |
866 | |
867 | uv_close((uv_handle_t*)&tcp_server, NULL); |
868 | } |
869 | |
870 | int ipc_helper_closed_handle(void) { |
871 | int r; |
872 | |
873 | memset(buffer, '.', LARGE_SIZE); |
874 | large_buf = uv_buf_init(buffer, LARGE_SIZE); |
875 | |
876 | r = uv_pipe_init(uv_default_loop(), &channel, 1); |
877 | ASSERT(r == 0); |
878 | |
879 | uv_pipe_open(&channel, 0); |
880 | |
881 | ASSERT(1 == uv_is_readable((uv_stream_t*) &channel)); |
882 | ASSERT(1 == uv_is_writable((uv_stream_t*) &channel)); |
883 | ASSERT(0 == uv_is_closing((uv_handle_t*) &channel)); |
884 | |
885 | if (write_until_data_queued() > 0) |
886 | send_handle_and_close(); |
887 | |
888 | r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); |
889 | ASSERT(r == 0); |
890 | |
891 | ASSERT(closed_handle_write == 1); |
892 | |
893 | MAKE_VALGRIND_HAPPY(); |
894 | return 0; |
895 | } |
896 | |
897 | |
898 | int ipc_helper_bind_twice(void) { |
899 | /* |
900 | * This is launched from test-ipc.c. stdin is a duplex channel |
901 | * over which two handles will be transmitted. |
902 | */ |
903 | struct sockaddr_in addr; |
904 | int r; |
905 | uv_buf_t buf; |
906 | |
907 | ASSERT(0 == uv_ip4_addr("0.0.0.0" , TEST_PORT, &addr)); |
908 | |
909 | r = uv_pipe_init(uv_default_loop(), &channel, 1); |
910 | ASSERT(r == 0); |
911 | |
912 | uv_pipe_open(&channel, 0); |
913 | |
914 | ASSERT(1 == uv_is_readable((uv_stream_t*) &channel)); |
915 | ASSERT(1 == uv_is_writable((uv_stream_t*) &channel)); |
916 | ASSERT(0 == uv_is_closing((uv_handle_t*) &channel)); |
917 | |
918 | buf = uv_buf_init("hello\n" , 6); |
919 | |
920 | r = uv_tcp_init(uv_default_loop(), &tcp_server); |
921 | ASSERT(r == 0); |
922 | r = uv_tcp_init(uv_default_loop(), &tcp_server2); |
923 | ASSERT(r == 0); |
924 | |
925 | r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); |
926 | ASSERT(r == 0); |
927 | r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0); |
928 | ASSERT(r == 0); |
929 | |
930 | r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, |
931 | (uv_stream_t*)&tcp_server, NULL); |
932 | ASSERT(r == 0); |
933 | r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1, |
934 | (uv_stream_t*)&tcp_server2, NULL); |
935 | ASSERT(r == 0); |
936 | |
937 | r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); |
938 | ASSERT(r == 0); |
939 | |
940 | MAKE_VALGRIND_HAPPY(); |
941 | return 0; |
942 | } |
943 | |
944 | int ipc_helper_send_zero(void) { |
945 | int r; |
946 | uv_buf_t zero_buf; |
947 | |
948 | zero_buf = uv_buf_init(0, 0); |
949 | |
950 | r = uv_pipe_init(uv_default_loop(), &channel, 0); |
951 | ASSERT(r == 0); |
952 | |
953 | uv_pipe_open(&channel, 0); |
954 | |
955 | ASSERT(1 == uv_is_readable((uv_stream_t*) &channel)); |
956 | ASSERT(1 == uv_is_writable((uv_stream_t*) &channel)); |
957 | ASSERT(0 == uv_is_closing((uv_handle_t*) &channel)); |
958 | |
959 | r = uv_write(&write_req, |
960 | (uv_stream_t*)&channel, |
961 | &zero_buf, |
962 | 1, |
963 | send_zero_write_cb); |
964 | |
965 | ASSERT(r == 0); |
966 | |
967 | r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); |
968 | ASSERT(r == 0); |
969 | |
970 | ASSERT(send_zero_write == 1); |
971 | |
972 | MAKE_VALGRIND_HAPPY(); |
973 | return 0; |
974 | } |
975 | |