1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#ifndef TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_
17#define TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_
18
19#include "tensorflow/core/framework/rendezvous.h"
20#include "tensorflow/core/framework/tensor.h"
21#include "tensorflow/core/lib/core/status.h"
22#include "tensorflow/core/lib/gtl/flatmap.h"
23#include "tensorflow/core/platform/macros.h"
24#include "tensorflow/core/platform/mutex.h"
25#include "tensorflow/core/platform/types.h"
26
27namespace tensorflow {
28
29// Implements the basic logic of matching Send and Recv operations. See
30// RendezvousInterface for more details.
31//
32// NOTE: Most users will use a class that wraps LocalRendezvous, such as
33// IntraProcessRendezvous or RemoteRendezvous. This class does not implement
34// RendezvousInterface because virtual dispatch to LocalRendezvous methods
35// is not expected to be needed.
36class LocalRendezvous {
37 public:
38 // If the class wrapping LocalRendezvous is refcounted (i.e., extending
39 // Rendezvous), pass in its pointer in constructor so the LocalRendezvous
40 // can make sure it outlives the async recv requests.
41 // Pass in nullptr if the wrapping class is not refcounted.
42 explicit LocalRendezvous(Rendezvous* owner)
43 : rc_owner_(owner), pending_callback_counter_(0) {}
44 ~LocalRendezvous();
45
46 Status Send(const Rendezvous::ParsedKey& key,
47 const Rendezvous::Args& send_args, const Tensor& val,
48 const bool is_dead);
49 void RecvAsync(const Rendezvous::ParsedKey& key,
50 const Rendezvous::Args& recv_args,
51 Rendezvous::DoneCallback done);
52 void StartAbort(const Status& status);
53 Status status();
54
55 private:
56 struct Item;
57
58 // By invariant, the item queue under each key is of the form
59 // [item.type == kSend]* meaning each item is a sent message.
60 // or
61 // [item.type == kRecv]* meaning each item is a waiter.
62 struct ItemQueue {
63 void push_back(Item* item);
64
65 Item* head = nullptr;
66 Item* tail = nullptr;
67 };
68
69 typedef gtl::FlatMap<uint64, ItemQueue> Table;
70
71 // Pointer to the owner class of this LocalRendezvous if it is refcounted.
72 const Rendezvous* rc_owner_;
73
74 // TODO(zhifengc): shard table_.
75 mutex mu_;
76 Table table_ TF_GUARDED_BY(mu_);
77 Status status_ TF_GUARDED_BY(mu_);
78 // Track the number of pening callbacks using a counter.
79 int pending_callback_counter_ TF_GUARDED_BY(mu_);
80 condition_variable pending_callback_cond_var_ TF_GUARDED_BY(mu_);
81
82 TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvous);
83};
84
85} // namespace tensorflow
86
87#endif // TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_
88