1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #ifndef TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_ |
17 | #define TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_ |
18 | |
19 | #include "tensorflow/core/framework/rendezvous.h" |
20 | #include "tensorflow/core/framework/tensor.h" |
21 | #include "tensorflow/core/lib/core/status.h" |
22 | #include "tensorflow/core/lib/gtl/flatmap.h" |
23 | #include "tensorflow/core/platform/macros.h" |
24 | #include "tensorflow/core/platform/mutex.h" |
25 | #include "tensorflow/core/platform/types.h" |
26 | |
27 | namespace tensorflow { |
28 | |
29 | // Implements the basic logic of matching Send and Recv operations. See |
30 | // RendezvousInterface for more details. |
31 | // |
32 | // NOTE: Most users will use a class that wraps LocalRendezvous, such as |
33 | // IntraProcessRendezvous or RemoteRendezvous. This class does not implement |
34 | // RendezvousInterface because virtual dispatch to LocalRendezvous methods |
35 | // is not expected to be needed. |
36 | class LocalRendezvous { |
37 | public: |
38 | // If the class wrapping LocalRendezvous is refcounted (i.e., extending |
39 | // Rendezvous), pass in its pointer in constructor so the LocalRendezvous |
40 | // can make sure it outlives the async recv requests. |
41 | // Pass in nullptr if the wrapping class is not refcounted. |
42 | explicit LocalRendezvous(Rendezvous* owner) |
43 | : rc_owner_(owner), pending_callback_counter_(0) {} |
44 | ~LocalRendezvous(); |
45 | |
46 | Status Send(const Rendezvous::ParsedKey& key, |
47 | const Rendezvous::Args& send_args, const Tensor& val, |
48 | const bool is_dead); |
49 | void RecvAsync(const Rendezvous::ParsedKey& key, |
50 | const Rendezvous::Args& recv_args, |
51 | Rendezvous::DoneCallback done); |
52 | void StartAbort(const Status& status); |
53 | Status status(); |
54 | |
55 | private: |
56 | struct Item; |
57 | |
58 | // By invariant, the item queue under each key is of the form |
59 | // [item.type == kSend]* meaning each item is a sent message. |
60 | // or |
61 | // [item.type == kRecv]* meaning each item is a waiter. |
62 | struct ItemQueue { |
63 | void push_back(Item* item); |
64 | |
65 | Item* head = nullptr; |
66 | Item* tail = nullptr; |
67 | }; |
68 | |
69 | typedef gtl::FlatMap<uint64, ItemQueue> Table; |
70 | |
71 | // Pointer to the owner class of this LocalRendezvous if it is refcounted. |
72 | const Rendezvous* rc_owner_; |
73 | |
74 | // TODO(zhifengc): shard table_. |
75 | mutex mu_; |
76 | Table table_ TF_GUARDED_BY(mu_); |
77 | Status status_ TF_GUARDED_BY(mu_); |
78 | // Track the number of pening callbacks using a counter. |
79 | int pending_callback_counter_ TF_GUARDED_BY(mu_); |
80 | condition_variable pending_callback_cond_var_ TF_GUARDED_BY(mu_); |
81 | |
82 | TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvous); |
83 | }; |
84 | |
85 | } // namespace tensorflow |
86 | |
87 | #endif // TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_ |
88 | |