1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | |
19 | #ifndef BRPC_STREAM_H |
20 | #define BRPC_STREAM_H |
21 | |
22 | #include "butil/iobuf.h" |
23 | #include "butil/scoped_generic.h" |
24 | #include "brpc/socket_id.h" |
25 | |
26 | namespace brpc { |
27 | |
28 | class Controller; |
29 | |
30 | typedef SocketId StreamId; |
31 | const StreamId INVALID_STREAM_ID = (StreamId)-1L; |
32 | |
33 | namespace detail { |
34 | struct StreamIdTraits; |
35 | }; |
36 | |
37 | // Auto-closed Stream |
38 | typedef butil::ScopedGeneric<StreamId, detail::StreamIdTraits> ScopedStream; |
39 | |
40 | class StreamInputHandler { |
41 | public: |
42 | virtual ~StreamInputHandler() = default; |
43 | virtual int on_received_messages(StreamId id, |
44 | butil::IOBuf *const messages[], |
45 | size_t size) = 0; |
46 | virtual void on_idle_timeout(StreamId id) = 0; |
47 | virtual void on_closed(StreamId id) = 0; |
48 | }; |
49 | |
50 | struct StreamOptions { |
51 | StreamOptions() |
52 | : max_buf_size(2 * 1024 * 1024) |
53 | , idle_timeout_ms(-1) |
54 | , messages_in_batch(128) |
55 | , handler(NULL) |
56 | {} |
57 | |
58 | // The max size of unconsumed data allowed at remote side. |
59 | // If |max_buf_size| <= 0, there's no limit of buf size |
60 | // default: 2097152 (2M) |
61 | int max_buf_size; |
62 | |
63 | // Notify user when there's no data for at least |idle_timeout_ms| |
64 | // milliseconds since the last time that HandleIdleTimeout or HandleInput |
65 | // finished. |
66 | // default: -1 |
67 | long idle_timeout_ms; |
68 | |
69 | // Maximum messages in batch passed to handler->on_received_messages |
70 | // default: 128 |
71 | size_t messages_in_batch; |
72 | |
73 | // Handle input message, if handler is NULL, the remote side is not allowd to |
74 | // write any message, who will get EBADF on writting |
75 | // default: NULL |
76 | StreamInputHandler* handler; |
77 | }; |
78 | |
79 | // [Called at the client side] |
80 | // Create a stream at client-side along with the |cntl|, which will be connected |
81 | // when receiving the response with a stream from server-side. If |options| is |
82 | // NULL, the stream will be created with default options |
83 | // Return 0 on success, -1 otherwise |
84 | int StreamCreate(StreamId* request_stream, Controller &cntl, |
85 | const StreamOptions* options); |
86 | |
87 | // [Called at the server side] |
88 | // Accept the stream. If client didn't create a stream with the request |
89 | // (cntl.has_remote_stream() returns false), this method would fail. |
90 | // Return 0 on success, -1 otherwise. |
91 | int StreamAccept(StreamId* response_stream, Controller &cntl, |
92 | const StreamOptions* options); |
93 | |
94 | // Write |message| into |stream_id|. The remote-side handler will received the |
95 | // message by the written order |
96 | // Returns 0 on success, errno otherwise |
97 | // Errno: |
98 | // - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size |
99 | // which the remote side hasn't consumed yet excceeds the number. |
100 | // - EINVAL: |stream_id| is invalied or has been closed |
101 | int StreamWrite(StreamId stream_id, const butil::IOBuf &message); |
102 | |
103 | // Write util the pending buffer size is less than |max_buf_size| or orrur |
104 | // occurs |
105 | // Returns 0 on success, errno otherwise |
106 | // Errno: |
107 | // - ETIMEDOUT: when |due_time| is not NULL and time expired this |
108 | // - EINVAL: the stream was close during waiting |
109 | int StreamWait(StreamId stream_id, const timespec* due_time); |
110 | |
111 | // Async wait |
112 | void StreamWait(StreamId stream_id, const timespec *due_time, |
113 | void (*on_writable)(StreamId stream_id, void* arg, |
114 | int error_code), |
115 | void *arg); |
116 | |
117 | // Close |stream_id|, after this function is called: |
118 | // - All the following |StreamWrite| would fail |
119 | // - |StreamWait| wakes up immediately. |
120 | // - Both sides |on_closed| would be notifed after all the pending buffers have |
121 | // been received |
122 | // This function could be called multiple times without side-effects |
123 | int StreamClose(StreamId stream_id); |
124 | |
125 | namespace detail { |
126 | |
127 | struct StreamIdTraits { |
128 | inline static StreamId InvalidValue() { |
129 | return INVALID_STREAM_ID; |
130 | }; |
131 | static void Free(StreamId f) { |
132 | if (f != INVALID_STREAM_ID) { |
133 | StreamClose(f); |
134 | } |
135 | } |
136 | }; |
137 | |
138 | } // namespace detail |
139 | |
140 | } // namespace brpc |
141 | |
142 | |
143 | #endif //BRPC_STREAM_H |
144 | |