1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18
19#ifndef BRPC_STREAM_H
20#define BRPC_STREAM_H
21
22#include "butil/iobuf.h"
23#include "butil/scoped_generic.h"
24#include "brpc/socket_id.h"
25
26namespace brpc {
27
28class Controller;
29
30typedef SocketId StreamId;
31const StreamId INVALID_STREAM_ID = (StreamId)-1L;
32
33namespace detail {
34struct StreamIdTraits;
35};
36
37// Auto-closed Stream
38typedef butil::ScopedGeneric<StreamId, detail::StreamIdTraits> ScopedStream;
39
40class StreamInputHandler {
41public:
42 virtual ~StreamInputHandler() = default;
43 virtual int on_received_messages(StreamId id,
44 butil::IOBuf *const messages[],
45 size_t size) = 0;
46 virtual void on_idle_timeout(StreamId id) = 0;
47 virtual void on_closed(StreamId id) = 0;
48};
49
50struct StreamOptions {
51 StreamOptions()
52 : max_buf_size(2 * 1024 * 1024)
53 , idle_timeout_ms(-1)
54 , messages_in_batch(128)
55 , handler(NULL)
56 {}
57
58 // The max size of unconsumed data allowed at remote side.
59 // If |max_buf_size| <= 0, there's no limit of buf size
60 // default: 2097152 (2M)
61 int max_buf_size;
62
63 // Notify user when there's no data for at least |idle_timeout_ms|
64 // milliseconds since the last time that HandleIdleTimeout or HandleInput
65 // finished.
66 // default: -1
67 long idle_timeout_ms;
68
69 // Maximum messages in batch passed to handler->on_received_messages
70 // default: 128
71 size_t messages_in_batch;
72
73 // Handle input message, if handler is NULL, the remote side is not allowd to
74 // write any message, who will get EBADF on writting
75 // default: NULL
76 StreamInputHandler* handler;
77};
78
79// [Called at the client side]
80// Create a stream at client-side along with the |cntl|, which will be connected
81// when receiving the response with a stream from server-side. If |options| is
82// NULL, the stream will be created with default options
83// Return 0 on success, -1 otherwise
84int StreamCreate(StreamId* request_stream, Controller &cntl,
85 const StreamOptions* options);
86
87// [Called at the server side]
88// Accept the stream. If client didn't create a stream with the request
89// (cntl.has_remote_stream() returns false), this method would fail.
90// Return 0 on success, -1 otherwise.
91int StreamAccept(StreamId* response_stream, Controller &cntl,
92 const StreamOptions* options);
93
94// Write |message| into |stream_id|. The remote-side handler will received the
95// message by the written order
96// Returns 0 on success, errno otherwise
97// Errno:
98// - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size
99// which the remote side hasn't consumed yet excceeds the number.
100// - EINVAL: |stream_id| is invalied or has been closed
101int StreamWrite(StreamId stream_id, const butil::IOBuf &message);
102
103// Write util the pending buffer size is less than |max_buf_size| or orrur
104// occurs
105// Returns 0 on success, errno otherwise
106// Errno:
107// - ETIMEDOUT: when |due_time| is not NULL and time expired this
108// - EINVAL: the stream was close during waiting
109int StreamWait(StreamId stream_id, const timespec* due_time);
110
111// Async wait
112void StreamWait(StreamId stream_id, const timespec *due_time,
113 void (*on_writable)(StreamId stream_id, void* arg,
114 int error_code),
115 void *arg);
116
117// Close |stream_id|, after this function is called:
118// - All the following |StreamWrite| would fail
119// - |StreamWait| wakes up immediately.
120// - Both sides |on_closed| would be notifed after all the pending buffers have
121// been received
122// This function could be called multiple times without side-effects
123int StreamClose(StreamId stream_id);
124
125namespace detail {
126
127struct StreamIdTraits {
128 inline static StreamId InvalidValue() {
129 return INVALID_STREAM_ID;
130 };
131 static void Free(StreamId f) {
132 if (f != INVALID_STREAM_ID) {
133 StreamClose(f);
134 }
135 }
136};
137
138} // namespace detail
139
140} // namespace brpc
141
142
143#endif //BRPC_STREAM_H
144